Python 中的多線程其實并不是真正的多線程,如果想要充分地使用多核 CPU 的資源,在 Python 中大部分情況需要使用多進程。Python 提供了非常好用的多進程包 multiprocessing,只需要定義一個函數(shù),Python 會完成其他所有事情。借助這個包,可以輕松完成從單進程到并發(fā)執(zhí)行的轉換。multiprocessing 支持子進程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等組件。
創(chuàng)建進程的類:Process([group [, target [, name [, args [, kwargs]]]]])
下面看一個創(chuàng)建函數(shù)并將其作為多個進程的例子:
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval, name):
print(name + '【start】')
time.sleep(interval)
print(name + '【end】')
if __name__ == "__main__":
p1 = multiprocessing.Process(target=worker, args=(2, '兩點水1'))
p2 = multiprocessing.Process(target=worker, args=(3, '兩點水2'))
p3 = multiprocessing.Process(target=worker, args=(4, '兩點水3'))
p1.start()
p2.start()
p3.start()
print("The number of CPU is:" + str(multiprocessing.cpu_count()))
for p in multiprocessing.active_children():
print("child p.name:" + p.name + "\tp.id" + str(p.pid))
print("END!!!!!!!!!!!!!!!!!")
輸出的結果:
當然我們也可以把進程創(chuàng)建成一個類,如下面的例子,當進程 p 調(diào)用 start() 時,自動調(diào)用 run() 方法。
# -*- coding: UTF-8 -*-
import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
def __init__(self, interval):
multiprocessing.Process.__init__(self)
self.interval = interval
def run(self):
n = 5
while n > 0:
print("當前時間: {0}".format(time.ctime()))
time.sleep(self.interval)
n -= 1
if __name__ == '__main__':
p = ClockProcess(3)
p.start()
輸出結果如下:
想知道 daemon 屬性有什么用,看下下面兩個例子吧,一個加了 daemon 屬性,一個沒有加,對比輸出的結果:
沒有加 deamon 屬性的例子:
# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval):
print('工作開始時間:{0}'.format(time.ctime()))
time.sleep(interval)
print('工作結果時間:{0}'.format(time.ctime()))
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=(3,))
p.start()
print('【EMD】')
輸出結果:
【EMD】
工作開始時間:Mon Oct 9 17:47:06 2017
工作結果時間:Mon Oct 9 17:47:09 2017
在上面示例中,進程 p 添加 daemon 屬性:
# -*- coding: UTF-8 -*-
import multiprocessing
import time
def worker(interval):
print('工作開始時間:{0}'.format(time.ctime()))
time.sleep(interval)
print('工作結果時間:{0}'.format(time.ctime()))
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=(3,))
p.daemon = True
p.start()
print('【EMD】')
輸出結果:
【EMD】
根據(jù)輸出結果可見,如果在子進程中添加了 daemon 屬性,那么當主進程結束的時候,子進程也會跟著結束。所以沒有打印子進程的信息。
結合上面的例子繼續(xù),如果我們想要讓子線程執(zhí)行完該怎么做呢?
那么我們可以用到 join 方法,join 方法的主要作用是:阻塞當前進程,直到調(diào)用 join 方法的那個進程執(zhí)行完,再繼續(xù)執(zhí)行當前進程。
因此看下加了 join 方法的例子:
import multiprocessing
import time
def worker(interval):
print('工作開始時間:{0}'.format(time.ctime()))
time.sleep(interval)
print('工作結果時間:{0}'.format(time.ctime()))
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, args=(3,))
p.daemon = True
p.start()
p.join()
print('【EMD】')
輸出的結果:
工作開始時間:Tue Oct 10 11:30:08 2017
工作結果時間:Tue Oct 10 11:30:11 2017
【EMD】
如果需要很多的子進程,難道我們需要一個一個的去創(chuàng)建嗎?
當然不用,我們可以使用進程池的方法批量創(chuàng)建子進程。
例子如下:
# -*- coding: UTF-8 -*-
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('進程的名稱:{0} ;進程的PID: {1} '.format(name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('進程 {0} 運行了 {1} 秒'.format(name, (end - start)))
if __name__ == '__main__':
print('主進程的 PID:{0}'.format(os.getpid()))
p = Pool(4)
for i in range(6):
p.apply_async(long_time_task, args=(i,))
p.close()
# 等待所有子進程結束后在關閉主進程
p.join()
print('【End】')
輸出的結果如下:
主進程的 PID:7256
進程的名稱:0 ;進程的PID: 1492
進程的名稱:1 ;進程的PID: 12232
進程的名稱:2 ;進程的PID: 4332
進程的名稱:3 ;進程的PID: 11604
進程 2 運行了 0.6500370502471924 秒
進程的名稱:4 ;進程的PID: 4332
進程 1 運行了 1.0830621719360352 秒
進程的名稱:5 ;進程的PID: 12232
進程 5 運行了 0.029001712799072266 秒
進程 4 運行了 0.9720554351806641 秒
進程 0 運行了 2.3181326389312744 秒
進程 3 運行了 2.5331451892852783 秒
【End】
這里有一點需要注意: Pool 對象調(diào)用 join() 方法會等待所有子進程執(zhí)行完畢,調(diào)用 join() 之前必須先調(diào)用 close() ,調(diào)用close() 之后就不能繼續(xù)添加新的 Process 了。
請注意輸出的結果,子進程 0,1,2,3是立刻執(zhí)行的,而子進程 4 要等待前面某個子進程完成后才執(zhí)行,這是因為 Pool 的默認大小在我的電腦上是 4,因此,最多同時執(zhí)行 4 個進程。這是 Pool 有意設計的限制,并不是操作系統(tǒng)的限制。如果改成:
p = Pool(5)
就可以同時跑 5 個進程。
Process 之間肯定是需要通信的,操作系統(tǒng)提供了很多機制來實現(xiàn)進程間的通信。Python 的 multiprocessing 模塊包裝了底層的機制,提供了Queue、Pipes 等多種方式來交換數(shù)據(jù)。
以 Queue 為例,在父進程中創(chuàng)建兩個子進程,一個往 Queue 里寫數(shù)據(jù),一個從 Queue 里讀數(shù)據(jù):
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
from multiprocessing import Process, Queue
import os, time, random
def write(q):
# 寫數(shù)據(jù)進程
print('寫進程的PID:{0}'.format(os.getpid()))
for value in ['兩點水', '三點水', '四點水']:
print('寫進 Queue 的值為:{0}'.format(value))
q.put(value)
time.sleep(random.random())
def read(q):
# 讀取數(shù)據(jù)進程
print('讀進程的PID:{0}'.format(os.getpid()))
while True:
value = q.get(True)
print('從 Queue 讀取的值為:{0}'.format(value))
if __name__ == '__main__':
# 父進程創(chuàng)建 Queue,并傳給各個子進程
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程 pw
pw.start()
# 啟動子進程pr
pr.start()
# 等待pw結束:
pw.join()
# pr 進程里是死循環(huán),無法等待其結束,只能強行終止
pr.terminate()
輸出的結果為:
讀進程的PID:13208
寫進程的PID:10864
寫進 Queue 的值為:兩點水
從 Queue 讀取的值為:兩點水
寫進 Queue 的值為:三點水
從 Queue 讀取的值為:三點水
寫進 Queue 的值為:四點水
從 Queue 讀取的值為:四點水