實踐環境
Python3.6
介紹
multiprocessing
是一個支持使用類似于執行緒模塊的API派生行程的包,該包同時提供本地和遠程并發,通過使用子行程而不是執行緒,有效地避開了全域解釋器鎖,因此,multiprocessing
模塊允許程式員充分利用給定機器上的多個處理器,它同時在Unix和Windows上運行,
該模塊還引入了在執行緒模塊中沒有類似程式的API,這方面的一個主要例子是Pool
物件,它提供了一種方便的方法,可以在多個輸入值的情況下,為行程之間分配輸入資料(資料并行),實作并行執行函式,以下示例演示了在模塊中定義此類函式,以便子行程能夠成功匯入該模塊的常見做法,這個使用Pool
實作資料并行的基本示例
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
控制臺輸出:
[1, 4, 9]
Process類
在multiprocessing
中,行程是通過創建一個Process
類并呼叫其start()
方法來派生的,Process
遵循threading.Thread
的API,multiprocess
程式的一個微小的例子:
from multiprocessing import Process
def f(name):
print('hello', name) # 輸出:hello shouke
if __name__ == '__main__':
p = Process(target=f, args=('shouke',))
p.start()
p.join()
下面是一個擴展示例,顯示所涉及的各個行程ID:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('shouke',))
p.start()
p.join()
控制臺輸出:
main line
module name: __main__
parent process: 13080
process id: 20044
function f
module name: __mp_main__
parent process: 20044
process id: 28952
hello shouke
背景關系和啟動方法
根據平臺的不同,multiprocessing
支持三種啟動行程的方式,這些啟動方法是
-
spawn
父行程啟動一個新的python解釋器行程,子行程將只繼承那些運行行程物件
run()
方法所需的資源,特別是,來自父行程的不必要的檔案描述符和句柄將不會被繼承,與使用fork或forkserver相比,使用此方法啟動行程相當慢,可在Unix和Windows上使用,Windows上默認使用該啟動方法, -
fork
父行程使用
os.fork()
來fork Python解釋器,子行程在開始時實際上與父行程相同,父行程的所有資源都由子行程繼承,請注意,安全地fork多執行緒行程是有問題的,僅在Unix上可用,Unix上默認會用該方法, -
forkserver
當程式啟動并選擇forkserver啟動方法時,服務器行程就會啟動,從那時起,每當需要新行程時,父行程都會連接到服務器,并請求它fork一個新行程,fork服務器行程是單執行緒的,因此使用
os.fork()
是安全的,不會繼承不必要的資源,在支持通過Unix管道傳遞檔案描述符的Unix平臺上可用,
To select a start method you use the set_start_method()
in the if __name__ == '__main__'
clause of the main module. For example
在3.4版本中進行了更改:在所有unix平臺上添加了spawn,并為一些unix平臺添加了forkserver,子行程不再繼承Windows上的所有父級可繼承句柄,
在Unix上,使用spawn或forkserver啟動方法還將啟動一個信號量跟蹤器行程,該行程跟蹤程式行程創建的未鏈接的命名信號量,當所有行程都退出時,信號量跟蹤器將取消任何剩余信號量的鏈接,通常應該沒有剩余信號量,但如果一個行程被信號殺死,可能會有一些“泄露”的信號量,(取消命名信號量的鏈接是一個嚴重的問題,因為系統只允許有限的數量,并且在下次重新啟動之前不會自動取消鏈接,)
要選擇啟動方法,請在主模塊的 if __name__ == '__main__'
子句中使用set_start_method()
,例如
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get()) # 輸出 hello
p.join()
set_start_method()
在一個程式中只能用一次
或者,也可以使用get_context()
來獲取背景關系物件,背景關系物件與multiprocessing
模塊具有相同的API,并允許在同一程式中使用多個啟動方法,
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
請注意,與一個背景關系相關的物件可能與不同背景關系的行程不兼容,特別是,使用fork背景關系創建的鎖不能傳遞給使用spawn或forkserver啟動方法啟動的行程,
想要使用特定啟動方法的庫可能應該使用get_context()
來避免干擾庫用戶的選擇
在行程之間交換物件
multiprocessing
支持行程之間的兩種通信信道
-
佇列
multiprocessing.Queue
類近乎是queue.Queue
的克隆. 例如:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
佇列是執行緒和行程安全的,
錯誤用法示例如下:
from multiprocessing import Process, Queue q = Queue() def f(): global q q.put([42, None, 'hello']) if __name__ == '__main__': p = Process(target=f) p.start() print(q.get()) # 取不到值 p.join()
涉及到類的時候咋處理呢?示例如下
from multiprocessing import Process, Queue class TestClass: def __init__(self, q): self.q = q def f(self): self.q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() obj = TestClass(q) p = Process(target=obj.f) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
或者
from multiprocessing import Process, Queue q = Queue() class TestClass: def f(self, q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() obj = TestClass() p = Process(target=obj.f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()
特別需要注意的是,由行程呼叫的target類函式中的其它普通屬性,和其它類函式中的同名屬性并不是共享的,除非也使用佇列或者其它共享方式,錯誤用法示例如下:
import threading import time from multiprocessing import Process, Queue class TestClass: def __init__(self, q): self.q = q self.task_done = False def f1(self): i = 0 while i < 5: self.q.put('hello') time.sleep(0.3) i += 1 self.task_done = True def f2(self): # while死回圈了 while not self.q.empty() or not self.task_done: # self.task_done永遠為True try: print(self.q.get_nowait()) except Exception: pass def run(self): thread = threading.Thread(target=self.f1, name="f1") thread.start() p = Process(target=self.f2) p.start() if __name__ == '__main__': q = Queue() obj = TestClass(q) obj.run()
正確做法如下:
import threading import time from multiprocessing import Process, Queue, active_children, Value class TestClass: def __init__(self, q, task_done): self.q = q self.task_done = task_done def f1(self): i = 0 while i < 5: self.q.put('hello') time.sleep(0.3) i += 1 self.task_done.value = https://www.cnblogs.com/shouke/archive/2023/06/19/1 def f2(self): item ='' while not self.q.empty() or self.task_done.value =https://www.cnblogs.com/shouke/archive/2023/06/19/= 0: try: item = self.q.get_nowait() print(item) except Exception: pass def run(self): thread = threading.Thread(target=self.f1, name="f1") thread.start() p = Process(target=self.f2) p.start() if __name__ == '__main__': q = Queue() task_done = Value('h', 0) obj = TestClass(q, task_done) obj.run()
或者
-
管道
multiprocessing.Pipe
函式回傳一對由管道連接的連接物件,默認情況下管道是雙向的,例如:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
multiprocessing.Pipe
回傳的兩個連接物件表示管道的兩端,每個連接物件都有multiprocessing.connection.send
和multiprocessing.connection.recv()
方法(以及其他方法),請注意,如果兩個行程(或執行緒)試圖同時讀取或寫入管道的同一端,則管道中的資料可能會被破壞,當然,同時使用不同管道末端的行程不會有破壞資料的風險,
行程同步
multiprocessing
包含來自threading
中所有同步原語的等效項,例如,可以使用鎖來確保一次只有一個行程列印到標準輸出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
行程之間共享狀態
如上所述,在進行并發編程時,通常最好盡可能避免使用共享狀態,當使用多個行程時尤其如此,
但是,如果您確實需要使用一些共享資料,那么multiprocessing
提供了幾種方法
共享記憶體
可以使用multiprocessing.Value
或multiprocessing.Array
將資料存盤在共享記憶體映射中,例如,以下代碼
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = https://www.cnblogs.com/shouke/archive/2023/06/19/3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ =='__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value) # 輸出:3.1415927
print(arr[:]) # 輸出:[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
創建num
和arr
時使用的'd'
和'i'
引數是陣列模塊使用的型別代碼:'d'
表示雙精度浮點,'i'
表示有符號整數,這些共享物件將是行程和執行緒安全的,
為了在使用共享記憶體時獲得更大的靈活性,可以使用multiprocessing.sharedtypes
模塊,該模塊支持創建從共享記憶體分配的任意ctypes
物件,
服務器行程(Server Process)
Manager()
回傳的管理器物件控制一個服務器行程,該行程可保存Python物件,并允許其他行程使用代理操作它們,
管理器物件回傳的管理器支持型別 list
, dict
, multiprocessing.managers.Namespace
, multiprocessing.Lock
, multiprocessing.RLock
, multiprocessing.Semaphore
, multiprocessing.BoundedSemaphore
, multiprocessing.Condition
, multiprocessing.Event
, multiprocessing.Barrier
, multiprocessing.Queue
, multiprocessing.Value
和multiprocessing.Array
,例如
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d) # 輸出:{1: '1', '2': 2, 0.25: None}
print(l) # 輸出:[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服務器行程管理器比使用共享記憶體物件更靈活,因為它們可以支持任意物件型別,此外,單個管理器可以由不同計算機上的行程通過網路共享,然而,它們比使用共享記憶體要慢,
使用行程池
Pool
類代表一個作業行程池,它具有允許以幾種不同方式將任務轉移給作業行程的方法,
例如:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import os
from multiprocessing import Pool, TimeoutError
def f(x):
return x*x
if __name__ == '__main__':
# 啟動 4 個作業行程
with Pool(processes=4) as pool:
# 輸出 "[0, 1, 4,..., 81]"
print(pool.map(f, range(10))) # 輸出:[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# 注意,此時采用的同步行,雖然是多行程,也要代碼全部執行完成才會繼續往下執行
# 按任意順序列印相同數字
print('列印相同數字')
for i in pool.imap_unordered(f, range(10)):
print(i)
# 異步計算“f(20)”
print('異步計算“f(20)”')
res = pool.apply_async(f, (20,)) # 僅在一個行程中運行
print(res.get(timeout=1)) # 列印 "400"
# 異步計算 "os.getpid()"
print('異步計算 "os.getpid()"')
res = pool.apply_async(os.getpid, ()) # 僅在一個行程中運行
print(res.get(timeout=1)) # 列印行程ID
# 異步啟動多個計算,可能使用更多行程
print('異步啟動多個計算')
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# 讓單個worker行程休眠10秒
print('讓單個worker行程休眠10秒')
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("遇到multiprocessing.TimeoutError")
print("此時,pool仍可用于更多的作業")
# 退出 with 代碼塊,pool就停用了
print("現在,pool已關閉,并且不再可用")
輸出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
列印相同數字
0
1
4
9
16
25
36
49
64
81
異步計算“f(20)”
400
異步計算 "os.getpid()"
13556
異步啟動多個計算
[13556, 13556, 13556, 13556]
讓單個worker行程休眠10秒
遇到multiprocessing.TimeoutError
此時,pool仍可用于更多的作業
現在,pool已關閉,并且不再可用
請注意,池的方法只能由創建池的行程使用,
此程式包中的功能要求
__main__
模塊可由子級匯入,這意味著一些示例,如multiprocessing.pool.pool
示例將無法在互動式解釋器中作業,例如>>> from multiprocessing import Pool >>> p = Pool(5) >>> def f(x): ... return x*x ... >>> p.map(f, [1,2,3]) Process SpawnPoolWorker-6: Process SpawnPoolWorker-7: Process SpawnPoolWorker-5: Traceback (most recent call last): ... AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)> AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
(如果你嘗試這樣做,它實際上會以半隨機的方式輸出三個交錯的完整traceback,然后你可能不得不以某種方式停止主行程,)
API參考
multiprocessing
包大部分復制執行緒模塊的API,
multiprocessing.Process
和exception
Process
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
Process
物件表示在立行程中運行的活動,Process
類具有threading.Thread
的所有方法的等價項,
建構式應始終使用關鍵字引數呼叫,
-
group
應始終為None
,它的存在只是為了與threading.Thread.target
兼容, -
target
供run()
方法呼叫的可呼叫物件,默認為None
,表示不呼叫任何內容, -
name
是行程名稱, -
args
是target
呼叫的引數元組, -
kwargs
是target
呼叫的關鍵字引數字典, -
daemon
用于設定將行程是否為守護行程,True
- 是 或False
- 否,如果為None
(默認值),則將從創建行程中繼承,
默認情況下,不會向target
傳遞任何引數,
如果子類重寫建構式,則必須確保在對行程執行其他操作之前呼叫基類建構式(Process.__init__()
),
在版本3.3中更改:添加daemon
引數
-
run()
表示行程活動的方法,
可以在子類中重寫此方法,標準run()方法呼叫作為target引數傳遞給物件建構式的可呼叫物件(如果有的話),其中順序引數和關鍵字引數分別取自
args
和kwargs
引數 -
start()
啟動行程活動,沒改行程對下最多只能呼叫一次, 它安排在單獨的行程中呼叫物件的
run()
方法, -
join([timeout])
如果可選引數
timeout
為None
(默認值),則該方法將阻塞,直到呼叫其join()
方法的行程終止為止,如果timeout
是一個正數,則表示最多阻塞timeout
引數指定的秒數,請注意,如果該方法的行程終止或方法超時,則該方法將回傳None
,檢查行程的退出碼以確定它是否已終止,一個行程可以被
join
多次,注意:阻塞表示不繼續往下執行,如果阻塞超時,程式繼續往下還行,如果此時
target
未運行完成,主程式會等待其運行完成后才終止,行程不能
join
自身,因為這會導致死鎖,在行程啟動之前嘗試join
行程是錯誤的, -
name
行程的名稱,一個字串,僅用于識別目的,它沒有語意,多個行程可能被賦予相同的名稱,初始名稱由建構式設定,如果沒有向建構式提供顯式名稱,則行程名被構造為形如
Process-N1:N2:…:Nk
字串,其中每個Nk
是其父行程的第N個子節點, -
is_alive()
回傳行程是否還存活大致上,行程物件從
start()
方法回傳的那一刻起一直處于活動狀態,直到子行程終止, -
daemon
行程的守護行程標志,一個布林值,這必須在呼叫
start()
之前設定,初始值是從創建行程時繼承的,
當行程退出時,它會嘗試終止其所有守護行程子行程,
請注意,守護行程不允許創建子行程,否則,如果守護行程在其父行程退出時被終止,它的子行程將成為孤兒行程,此外,這些不是Unix守護行程或服務,它們是正常行程,如果非守護行程退出,它們將被終止(而不是被
join
),
除了threading.Thread
API之外,Process
物件還支持以下屬性和方法:
-
pid
回傳行程ID,行程派生之前,其值為None
-
exitcode
子行程的退出碼,如果行程尚未終止,則其值為None
,負值-N
表示子行程被信號N終止, -
terminate()
終止行程,在Unix上,這是使用SIGTERM信號完成的;在Windows上使用TerminateProcess()
,請注意,退出handler和和finally子句等將不會被執行,請注意,行程的子行程不會被終止,它們只會成為孤兒行程
-
..略,更多參考請查閱官方檔案
示例
Process
的一些方法的示例用法
import multiprocessing, time, signal
p = multiprocessing.Process(target=time.sleep, args=(1000,))
print(p, p.is_alive()) # 輸出:<Process(Process-1, initial)> False
p.start()
print(p, p.is_alive()) # 輸出:<Process(Process-1, started)> True
p.terminate()
time.sleep(0.1)
print(p, p.is_alive()) # 輸出:<Process(Process-1, stopped[SIGTERM])> False
print(p.exitcode == -signal.SIGTERM) # 輸出:True
例外
-
exception
multiprocessing.ProcessError
所有
multiprocessing
例外的基類 -
exception
multiprocessing.BufferTooShort
當提供的緩沖區物件太小而無法讀取訊息時引發的例外,
-
exception
multiprocessing.AuthenticationError
發生身份驗證錯誤時引發的例外
-
exception
multiprocessing.TimeoutError
具有
timeout
的方法超時引發的例外,
管道和佇列
-
class
multiprocessing.Pipe([duplex])
回傳一對表示管道終端的
multiprocessing.Connection
物件(conn1,conn2)
,如果duplex
為True
(默認值),則管道為雙向管道,如果duplex
為False
,則管道是單向的:conn1
只能用于接收訊息,conn2
只能用于發送訊息 -
class
multiprocessing.Queue([maxsize])
回傳使用管道和一些鎖/信號量實作的行程共享佇列,當行程第一次將專案放入佇列時,會啟動一個feeder執行緒,該執行緒將物件從緩沖區傳輸到管道中,來自標準庫的
queue
模塊的常見queue.Empty
和queue.Full
例外被引發以發出超時信號,multiprocessing.Queue
實作了Queue.Queue
的所有方法,除了task_done()
和join()
-
qsize()
回傳佇列的大致大小,由于多執行緒/多行程的語意,這是不可靠的,
請注意,這可能會在Unix平臺(如Mac OS X)上觸發
NotImplementedError
,因為其未實作sem_getvalue()
, -
empty()
如果佇列為空,則回傳
True
,否則回傳False
,由于多執行緒/多處理語意的原因,這是不可靠的, -
full()
如果佇列已滿,則回傳
True
,否則回傳False
,由于多執行緒/多處理語意的原因,這是不可靠的, -
put(obj[, block[, timeout]])
將obj放入佇列,如果可選引數
block
為True
(默認值),并且timeout
為None
(默認值),則必要時阻塞,直到有可用空閑slot,如果timeout
是一個正數,最多會阻塞timeout
指定秒數,并拋出queue.Full
例外,如果在該時間內沒有可用slot的話,如果block
為False
,如果有可用空閑slot,則將專案放入佇列中,否則拋出queue.Full
例外(在這種情況下會忽略timeout
), -
put_nowait(obj)
等價于put(obj, False)
-
get([block[, timeout]])
從佇列中洗掉并回傳被洗掉專案,如果引數
block
為True
(默認值),并且timeout
為None
(默認值),則獲取不到專案時阻塞,直到有可獲取項,如果timeout
是一個正數,最多會阻塞timeout
指定秒數,并拋出queue.Empty
例外,如果在超時時間內沒有可用專案的話,如果block
為False
,如果有可獲取項,則立即回傳專案,否則拋出queue.Empty
例外(在這種情況下會忽略timeout
), -
get_nowait()
等價于get(False)
-
..略,更多參考請查閱官方檔案
...略,更多參考請查閱官方檔案
雜項
-
multiprocessing.active_children()
回傳當前行程的所有活動子行程的串列,呼叫該方法的副作用是“阻塞”任何已經完成的行程(原文:Calling this has the side effect of “joining” any processes which have already finished,)
-
multiprocessing.cpu_count()
回傳系統的CPU數量,該數量并不等于當前行程可以使用的CPU數量,可用cpu的數量可以通過
len(os.sched_getaffinity(0))
獲取,不過可能會拋NotImplementedError
例外, -
multiprocessing.``current_process
()回傳當前行程對應的
multiprocessing.Process
對下,類似threading.current_thread()
-
multiprocessing.get_all_start_methods()
回傳支持的啟動方法的串列,其中第一個是默認方法,可能的啟動方法有
'fork'
,'spawn'
和'forkserver'
,在Windows上,僅'spawn'
可用,在Unix上,始終支持'fork'
和'spawn'
,默認值為“'fork'
,3.4版新增
-
multiprocessing.get_start_method(allow_none=False)
回傳用于啟動行程的啟動方法的名稱,如果尚未設定啟動方法,且allow_none為
False
,則回傳默認方法名詞,如果尚未設定啟動方法,并且allow_none為True
,則回傳None
,回傳值可以是'fork'
,'spawn'
,'forkserver'
或None
.'fork'
為Unix上的默認值,而'spawn'
則是Windows上的默認值,3.4版新增,
-
multiprocessing.``set_start_method
(method)設定應用于啟動子行程的方法,method可以是
'fork'
,'spawn'
或'forkserver'
,請注意,最多只能呼叫一次,并且應該在主模塊的if__name__=='__main__'
子句中使用,3.4版新增,
-
..略,更多參考請查閱官方檔案
..略,更多參考請查閱官方檔案
Process工具
可以創建一個行程池,用于執行使用multiprocessing.pool.Pool
類提交給它的任務,
Pool類
-
class
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
一個行程池物件,用于控制可以向其提交作業的作業行程池,它支持帶有超時和回呼的異步結果,并具有并行map實作,
processes
是要使用的作業行程的數量,如果processes
為None
,則默認使用os.cpu_count()
回傳的數字,initializer
如果值不為None
,那么每個作業行程在啟動時都會呼叫initializer(*initargs)
,maxtasksperchild
是作業行程在退出并替換為新的作業行程之前可以完成的任務數,以便釋放未使用的資源,默認的maxtasksperchild
為None
,這意味著作業行程存活時間將與行程池一樣長,context
用于指定用于啟動作業行程的背景關系,通常,行程池是使用背景關系物件的函式multiprocessing.Pool()
或Pool()
方法創建的,在這兩種情況下,背景關系都設定得適當,
請注意,池物件的方法只能由創建池的行程呼叫,
3.2版新增:
maxtasksperchild
3.4版新增:
context
注意:
池中的作業行程通常在作業佇列的整個持續時間內保持存活,在其他系統(如Apache、mod_wsgi等)中發現的一種釋放作業行程所持有資源的常見模式是,允許池中的作業行程在退出、清理和生成新行程以取代舊行程之前只完成一定數量的作業,池的
maxtasksperchild
引數向最終用戶暴露了這一能力,apply(func[, args[, kwds]])
使用引數
args
和關鍵字引數kwds
呼叫func,它會阻塞,直到可獲取結果為止,考慮到阻塞問題,apply_async()
更適合并行執行作業,此外,func
只在池的一個作業行程中執行,apply_async(func[, args[, kwds[, callback[, error_callback]]]])
apply()
方法的變體,回傳結果物件,如果指定了
callback
,那么它應該是一個接受單個引數的可呼叫函式,當可獲取結果時,將對其應用callback
,除非呼叫失敗,在這種情況下,將對其應用error_callback
,如果指定了
error_callback
,那么它應該是一個接受單個引數的可呼叫函式,如果目標函式失敗,則會使用例外實體呼叫error_callback
,回呼應該立即完成,否則處理結果的執行緒將被阻塞,
map(func, iterable[, chunksize])
內置函式map()的并行等價物(不過它只支持一個
iterable
引數),它會阻塞,直到可獲取結果,該方法將
iterable
分割為多個塊,并將這些塊作為單獨的任務提交給行程池,可以通過將chunksize
設定為正整數來指定這些塊的(近似)大小,map_async(func, iterable[, chunksize[, callback[, error_callback]]])
map()
方法的一個變體,它回傳一個結果物件,如果指定了
callback
,那么它應該是一個接受單個引數的可呼叫函式,當可獲取結果時,將對其應用callback
,除非呼叫失敗,在這種情況下,將應用error_callback
,如果指定了
error_callback
,那么它應該是一個接受單個引數的可呼叫函式,如果目標函式失敗,則會使用例外實體呼叫error_callback
,回呼應該立即完成,否則處理結果的執行緒將被阻塞,
imap(func, iterable[, chunksize])
map()
的一個更惰性版本,chunksize
引數與map()
方法使用的引數相同,對于非常長的迭代,使用較大的chunksize
值可以使作業比使用默認值1更快地完成,此外,如果
chunksize
為1,則imap()
方法回傳的迭代器的next()
方法有一個可選的timeout
引數:如果無法在timeout
秒內回傳結果,next(timeout)
將引發multiprocessing.TimeoutError
imap_unordered(func, iterable[, chunksize])
與imap()
相同,只是回傳迭代器的結果的順序是任意的,(只有當只有一個作業行程時,才能保證順序“正確”)starmap(func, iterable[, chunksize])
類似于
map()
,只是iterable的元素被當做引數,不拆解,因此,[(1,2), (3,4)]的迭代結果是[func(1,2),func(3,4)],
3.3版新增,
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
starma()
和map_async()
的組合,對可迭代項中的可迭代項進行迭代,并在未拆解可迭代項的情況下呼叫func,回傳一個結果物件,3.3版新增,
close()
阻止將更多任務提交到行程池中,完成所有任務后,作業行程將退出,terminate()
在未完成未完成的作業的情況下立即停止作業行程,當行程池物件被垃圾回收時,將立即呼叫
terminate()
,join()
等待作業行程退出,在使用
join()
之前,必須呼叫close()
或terminate()
,3.3版新增:行程池物件現在支持背景關系管理協議——請參閱背景關系管理器型別
__enter__()
回傳池物件,__exit_()
呼叫terminate()
AsyncResult
類
-
class
multiprocessing.pool.AsyncResult
Pool.apply_async()和Pool.map_async()
回傳的結果類,
get([timeout])
當結果已準備好時回傳結果,如果timeout
不是None
,并且沒有在timeout
秒內獲取到結果,則會引發multiprocessing.TimeoutError
,如果遠程呼叫引發了例外,則該例外將由get()
重新拋出,
wait([timeout])
等待,直到結果可獲取,或者直到超過timeout
秒,
ready()
回傳呼叫是否完成
successful()
回傳呼叫是否已完成,不引發例外,如果結果還未準備好,將引發AssertionError
,
行程池使用示例
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
...略
作者:授客
微信/QQ:1033553122
全國軟體測驗QQ交流群:7156436
Git地址:https://gitee.com/ishouke
友情提示:限于時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞后如有任何疑問,請聯系我!!!
微信打賞
支付寶打賞 全國軟體測驗交流QQ群
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/555555.html
標籤:其他
上一篇:Manacher演算法學習筆記
下一篇:返回列表