主頁 > 後端開發 > Python 標準類別庫-并發執行之multiprocessing-基于行程的并行

Python 標準類別庫-并發執行之multiprocessing-基于行程的并行

2023-06-20 08:05:22 後端開發

實踐環境

Python3.6

介紹

multiprocessing是一個支持使用類似于執行緒模塊的API派生行程的包,該包同時提供本地和遠程并發,通過使用子行程而不是執行緒,有效地避開了全域解釋器鎖,因此,multiprocessing模塊允許程式員充分利用給定機器上的多個處理器,它同時在Unix和Windows上運行,

該模塊還引入了在執行緒模塊中沒有類似程式的API,這方面的一個主要例子是Pool物件,它提供了一種方便的方法,可以在多個輸入值的情況下,為行程之間分配輸入資料(資料并行),實作并行執行函式,以下示例演示了在模塊中定義此類函式,以便子行程能夠成功匯入該模塊的常見做法,這個使用Pool實作資料并行的基本示例

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

控制臺輸出:

[1, 4, 9]

Process類

multiprocessing中,行程是通過創建一個Process類并呼叫其start()方法來派生的,Process遵循threading.Thread的API,multiprocess程式的一個微小的例子:

from multiprocessing import Process

def f(name):
    print('hello', name) # 輸出:hello shouke

if __name__ == '__main__':
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()

下面是一個擴展示例,顯示所涉及的各個行程ID:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()

控制臺輸出:

main line
module name: __main__
parent process: 13080
process id: 20044
function f
module name: __mp_main__
parent process: 20044
process id: 28952
hello shouke

背景關系和啟動方法

根據平臺的不同,multiprocessing支持三種啟動行程的方式,這些啟動方法是

  • spawn

    父行程啟動一個新的python解釋器行程,子行程將只繼承那些運行行程物件run()方法所需的資源,特別是,來自父行程的不必要的檔案描述符和句柄將不會被繼承,與使用forkforkserver相比,使用此方法啟動行程相當慢,可在Unix和Windows上使用,Windows上默認使用該啟動方法,

  • fork

    父行程使用os.fork()來fork Python解釋器,子行程在開始時實際上與父行程相同,父行程的所有資源都由子行程繼承,請注意,安全地fork多執行緒行程是有問題的,僅在Unix上可用,Unix上默認會用該方法,

  • forkserver

    當程式啟動并選擇forkserver啟動方法時,服務器行程就會啟動,從那時起,每當需要新行程時,父行程都會連接到服務器,并請求它fork一個新行程,fork服務器行程是單執行緒的,因此使用os.fork()是安全的,不會繼承不必要的資源,在支持通過Unix管道傳遞檔案描述符的Unix平臺上可用,

To select a start method you use the set_start_method() in the if __name__ == '__main__' clause of the main module. For example

在3.4版本中進行了更改:在所有unix平臺上添加了spawn,并為一些unix平臺添加了forkserver,子行程不再繼承Windows上的所有父級可繼承句柄,

在Unix上,使用spawnforkserver啟動方法還將啟動一個信號量跟蹤器行程,該行程跟蹤程式行程創建的未鏈接的命名信號量,當所有行程都退出時,信號量跟蹤器將取消任何剩余信號量的鏈接,通常應該沒有剩余信號量,但如果一個行程被信號殺死,可能會有一些“泄露”的信號量,(取消命名信號量的鏈接是一個嚴重的問題,因為系統只允許有限的數量,并且在下次重新啟動之前不會自動取消鏈接,)

要選擇啟動方法,請在主模塊的 if __name__ == '__main__'子句中使用set_start_method(),例如

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get()) # 輸出 hello
    p.join()

set_start_method() 在一個程式中只能用一次

或者,也可以使用get_context()來獲取背景關系物件,背景關系物件與multiprocessing模塊具有相同的API,并允許在同一程式中使用多個啟動方法,

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

請注意,與一個背景關系相關的物件可能與不同背景關系的行程不兼容,特別是,使用fork背景關系創建的鎖不能傳遞給使用spawnforkserver啟動方法啟動的行程,

想要使用特定啟動方法的庫可能應該使用get_context()來避免干擾庫用戶的選擇

在行程之間交換物件

multiprocessing支持行程之間的兩種通信信道

  • 佇列

    multiprocessing.Queue 類近乎是queue.Queue的克隆. 例如:

    from multiprocessing import Process, Queue
    
    def f(q):
        q.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    

    佇列是執行緒和行程安全的,

    錯誤用法示例如下:

    from multiprocessing import Process, Queue
    
    q = Queue()
    
    def f():
        global q
        q.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        p = Process(target=f)
        p.start()
        print(q.get())    # 取不到值
        p.join()
    

    涉及到類的時候咋處理呢?示例如下

    from multiprocessing import Process, Queue
    
    class TestClass:
        def __init__(self, q):
            self.q = q
    
        def f(self):
            self.q.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        q = Queue()
        obj = TestClass(q)
        p = Process(target=obj.f)
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    

    或者

    from multiprocessing import Process, Queue
    
    q = Queue()
    
    class TestClass:
        def f(self, q):
            q.put([42, None, 'hello'])
    
    
    if __name__ == '__main__':
        q = Queue()
        obj = TestClass()
        p = Process(target=obj.f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    

    特別需要注意的是,由行程呼叫的target類函式中的其它普通屬性,和其它類函式中的同名屬性并不是共享的,除非也使用佇列或者其它共享方式,錯誤用法示例如下:

    import threading
    import time
    
    from multiprocessing import Process, Queue
    
    class TestClass:
        def __init__(self, q):
            self.q = q
            self.task_done = False
    
        def f1(self):
            i = 0
            while i < 5:
                self.q.put('hello')
                time.sleep(0.3)
                i += 1
            self.task_done = True
    
        def f2(self):
            
            # while死回圈了
            while not self.q.empty() or not self.task_done: # self.task_done永遠為True
                try:
                    print(self.q.get_nowait())
                except Exception:
                    pass
    
        def run(self):
            thread = threading.Thread(target=self.f1,
                                      name="f1")
            thread.start()
    
            p = Process(target=self.f2)
            p.start()
    
    
    if __name__ == '__main__':
        q = Queue()
        obj = TestClass(q)
        obj.run()
    

    正確做法如下:

    import threading
    import time
    
    from multiprocessing import Process, Queue, active_children, Value
    
    class TestClass:
        def __init__(self, q, task_done):
            self.q = q
            self.task_done = task_done
    
        def f1(self):
            i = 0
            while i < 5:
                self.q.put('hello')
                time.sleep(0.3)
                i += 1
            self.task_done.value = https://www.cnblogs.com/shouke/archive/2023/06/19/1
    
        def f2(self):
            item =''
            while not self.q.empty() or self.task_done.value =https://www.cnblogs.com/shouke/archive/2023/06/19/= 0:
                try:
                    item = self.q.get_nowait()
                    print(item)
                except Exception:
                    pass
    
        def run(self):
            thread = threading.Thread(target=self.f1,
                                      name="f1")
            thread.start()
    
            p = Process(target=self.f2)
            p.start()
    
    
    if __name__ == '__main__':
        q = Queue()
        task_done = Value('h', 0)
        obj = TestClass(q, task_done)
        obj.run()
    

    或者

  • 管道

    multiprocessing.Pipe函式回傳一對由管道連接的連接物件,默認情況下管道是雙向的,例如:

    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())   # prints "[42, None, 'hello']"
        p.join()
    

    multiprocessing.Pipe回傳的兩個連接物件表示管道的兩端,每個連接物件都有multiprocessing.connection.sendmultiprocessing.connection.recv() 方法(以及其他方法),請注意,如果兩個行程(或執行緒)試圖同時讀取或寫入管道的同一端,則管道中的資料可能會被破壞,當然,同時使用不同管道末端的行程不會有破壞資料的風險,

行程同步

multiprocessing包含來自threading中所有同步原語的等效項,例如,可以使用鎖來確保一次只有一個行程列印到標準輸出:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

行程之間共享狀態

如上所述,在進行并發編程時,通常最好盡可能避免使用共享狀態,當使用多個行程時尤其如此,

但是,如果您確實需要使用一些共享資料,那么multiprocessing提供了幾種方法

共享記憶體

可以使用multiprocessing.Valuemultiprocessing.Array將資料存盤在共享記憶體映射中,例如,以下代碼

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = https://www.cnblogs.com/shouke/archive/2023/06/19/3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ =='__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value) # 輸出:3.1415927
    print(arr[:]) # 輸出:[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

創建numarr時使用的'd''i'引數是陣列模塊使用的型別代碼:'d'表示雙精度浮點,'i'表示有符號整數,這些共享物件將是行程和執行緒安全的,

為了在使用共享記憶體時獲得更大的靈活性,可以使用multiprocessing.sharedtypes模塊,該模塊支持創建從共享記憶體分配的任意ctypes物件,

服務器行程(Server Process)

Manager()回傳的管理器物件控制一個服務器行程,該行程可保存Python物件,并允許其他行程使用代理操作它們,

管理器物件回傳的管理器支持型別 list, dict, multiprocessing.managers.Namespace, multiprocessing.Lock, multiprocessing.RLock, multiprocessing.Semaphore, multiprocessing.BoundedSemaphore, multiprocessing.Condition, multiprocessing.Event, multiprocessing.Barrier, multiprocessing.Queue, multiprocessing.Valuemultiprocessing.Array,例如

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d) # 輸出:{1: '1', '2': 2, 0.25: None}
        print(l) # 輸出:[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服務器行程管理器比使用共享記憶體物件更靈活,因為它們可以支持任意物件型別,此外,單個管理器可以由不同計算機上的行程通過網路共享,然而,它們比使用共享記憶體要慢,

使用行程池

Pool類代表一個作業行程池,它具有允許以幾種不同方式將任務轉移給作業行程的方法,

例如:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import os
from multiprocessing import Pool, TimeoutError

def f(x):
    return x*x

if __name__ == '__main__':
    # 啟動 4 個作業行程
    with Pool(processes=4) as pool:
        # 輸出 "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10))) # 輸出:[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
        # 注意,此時采用的同步行,雖然是多行程,也要代碼全部執行完成才會繼續往下執行

        # 按任意順序列印相同數字
        print('列印相同數字')
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # 異步計算“f(20)”
        print('異步計算“f(20)”')
        res = pool.apply_async(f, (20,))      # 僅在一個行程中運行
        print(res.get(timeout=1))             # 列印 "400"

        # 異步計算 "os.getpid()"
        print('異步計算 "os.getpid()"')
        res = pool.apply_async(os.getpid, ()) # 僅在一個行程中運行
        print(res.get(timeout=1))             # 列印行程ID

        # 異步啟動多個計算,可能使用更多行程
        print('異步啟動多個計算')
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # 讓單個worker行程休眠10秒
        print('讓單個worker行程休眠10秒')
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("遇到multiprocessing.TimeoutError")

        print("此時,pool仍可用于更多的作業")

    # 退出 with 代碼塊,pool就停用了
    print("現在,pool已關閉,并且不再可用")

輸出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
列印相同數字
0
1
4
9
16
25
36
49
64
81
異步計算“f(20)”
400
異步計算 "os.getpid()"
13556
異步啟動多個計算
[13556, 13556, 13556, 13556]
讓單個worker行程休眠10秒
遇到multiprocessing.TimeoutError
此時,pool仍可用于更多的作業
現在,pool已關閉,并且不再可用

請注意,池的方法只能由創建池的行程使用,

此程式包中的功能要求 __main__模塊可由子級匯入,這意味著一些示例,如multiprocessing.pool.pool示例將無法在互動式解釋器中作業,例如

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process SpawnPoolWorker-6:
Process SpawnPoolWorker-7:
Process SpawnPoolWorker-5:
Traceback (most recent call last):
...
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>

(如果你嘗試這樣做,它實際上會以半隨機的方式輸出三個交錯的完整traceback,然后你可能不得不以某種方式停止主行程,)

API參考

multiprocessing包大部分復制執行緒模塊的API,

multiprocessing.Processexception

Process
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) 

Process物件表示在立行程中運行的活動,Process類具有threading.Thread的所有方法的等價項,

建構式應始終使用關鍵字引數呼叫,

  • group 應始終為None,它的存在只是為了與threading.Thread.target兼容,

  • targetrun()方法呼叫的可呼叫物件,默認為None,表示不呼叫任何內容,

  • name 是行程名稱,

  • argstarget呼叫的引數元組,

  • kwargstarget呼叫的關鍵字引數字典,

  • daemon 用于設定將行程是否為守護行程,True - 是 或False - 否,如果為None(默認值),則將從創建行程中繼承,

默認情況下,不會向target傳遞任何引數,

如果子類重寫建構式,則必須確保在對行程執行其他操作之前呼叫基類建構式(Process.__init__()),

在版本3.3中更改:添加daemon引數

  • run()

    表示行程活動的方法,

    可以在子類中重寫此方法,標準run()方法呼叫作為target引數傳遞給物件建構式的可呼叫物件(如果有的話),其中順序引數和關鍵字引數分別取自argskwargs引數

  • start()
    啟動行程活動,

    沒改行程對下最多只能呼叫一次, 它安排在單獨的行程中呼叫物件的run()方法,

  • join([timeout])

    如果可選引數timeoutNone(默認值),則該方法將阻塞,直到呼叫其join()方法的行程終止為止,如果timeout是一個正數,則表示最多阻塞timeout引數指定的秒數,請注意,如果該方法的行程終止或方法超時,則該方法將回傳None,檢查行程的退出碼以確定它是否已終止,

    一個行程可以被join多次,

    注意:阻塞表示不繼續往下執行,如果阻塞超時,程式繼續往下還行,如果此時target未運行完成,主程式會等待其運行完成后才終止,

    行程不能join自身,因為這會導致死鎖,在行程啟動之前嘗試join行程是錯誤的,

  • name
    行程的名稱,一個字串,僅用于識別目的,它沒有語意,多個行程可能被賦予相同的名稱,

    初始名稱由建構式設定,如果沒有向建構式提供顯式名稱,則行程名被構造為形如Process-N1:N2:…:Nk字串,其中每個Nk是其父行程的第N個子節點,

  • is_alive()
    回傳行程是否還存活

    大致上,行程物件從start()方法回傳的那一刻起一直處于活動狀態,直到子行程終止,

  • daemon

    行程的守護行程標志,一個布林值,這必須在呼叫start()之前設定,

    初始值是從創建行程時繼承的,

    當行程退出時,它會嘗試終止其所有守護行程子行程,

    請注意,守護行程不允許創建子行程,否則,如果守護行程在其父行程退出時被終止,它的子行程將成為孤兒行程,此外,這些不是Unix守護行程或服務,它們是正常行程,如果非守護行程退出,它們將被終止(而不是被join),

除了threading.Thread API之外,Process物件還支持以下屬性和方法:

  • pid
    回傳行程ID,行程派生之前,其值為None

  • exitcode
    子行程的退出碼,如果行程尚未終止,則其值為None,負值-N表示子行程被信號N終止,

  • terminate()
    終止行程,在Unix上,這是使用SIGTERM信號完成的;在Windows上使用TerminateProcess(),請注意,退出handler和和finally子句等將不會被執行,

    請注意,行程的子行程不會被終止,它們只會成為孤兒行程

  • ..略,更多參考請查閱官方檔案

示例

Process的一些方法的示例用法

import multiprocessing, time, signal

p = multiprocessing.Process(target=time.sleep, args=(1000,))
print(p, p.is_alive()) # 輸出:<Process(Process-1, initial)> False
p.start()
print(p, p.is_alive()) # 輸出:<Process(Process-1, started)> True
p.terminate()
time.sleep(0.1)
print(p, p.is_alive()) # 輸出:<Process(Process-1, stopped[SIGTERM])> False
print(p.exitcode == -signal.SIGTERM) # 輸出:True
例外
  • exception multiprocessing.ProcessError

    所有multiprocessing例外的基類

  • exception multiprocessing.BufferTooShort

    當提供的緩沖區物件太小而無法讀取訊息時引發的例外,

  • exception multiprocessing.AuthenticationError

    發生身份驗證錯誤時引發的例外

  • exception multiprocessing.TimeoutError

    具有timeout的方法超時引發的例外,

管道和佇列

  • class multiprocessing.Pipe([duplex])

    回傳一對表示管道終端的multiprocessing.Connection物件(conn1,conn2),如果duplexTrue(默認值),則管道為雙向管道,如果duplexFalse,則管道是單向的:conn1只能用于接收訊息,conn2只能用于發送訊息

  • class multiprocessing.Queue([maxsize])

    回傳使用管道和一些鎖/信號量實作的行程共享佇列,當行程第一次將專案放入佇列時,會啟動一個feeder執行緒,該執行緒將物件從緩沖區傳輸到管道中,來自標準庫的queue模塊的常見queue.Emptyqueue.Full例外被引發以發出超時信號,multiprocessing.Queue實作了Queue.Queue的所有方法,除了task_done()join()

  • qsize()

    回傳佇列的大致大小,由于多執行緒/多行程的語意,這是不可靠的,

    請注意,這可能會在Unix平臺(如Mac OS X)上觸發NotImplementedError,因為其未實作sem_getvalue()

  • empty()

    如果佇列為空,則回傳True,否則回傳False,由于多執行緒/多處理語意的原因,這是不可靠的,

  • full()

    如果佇列已滿,則回傳True,否則回傳False,由于多執行緒/多處理語意的原因,這是不可靠的,

  • put(obj[, block[, timeout]])

    將obj放入佇列,如果可選引數blockTrue(默認值),并且timeoutNone(默認值),則必要時阻塞,直到有可用空閑slot,如果timeout是一個正數,最多會阻塞timeout指定秒數,并拋出queue.Full例外,如果在該時間內沒有可用slot的話,如果blockFalse,如果有可用空閑slot,則將專案放入佇列中,否則拋出queue.Full例外(在這種情況下會忽略timeout),

  • put_nowait(obj)
    等價于put(obj, False)

  • get([block[, timeout]])

    從佇列中洗掉并回傳被洗掉專案,如果引數blockTrue(默認值),并且timeoutNone(默認值),則獲取不到專案時阻塞,直到有可獲取項,如果timeout是一個正數,最多會阻塞timeout指定秒數,并拋出queue.Empty例外,如果在超時時間內沒有可用專案的話,如果blockFalse,如果有可獲取項,則立即回傳專案,否則拋出queue.Empty例外(在這種情況下會忽略timeout),

  • get_nowait()
    等價于get(False)

  • ..略,更多參考請查閱官方檔案

...略,更多參考請查閱官方檔案

雜項

  • multiprocessing.active_children()

    回傳當前行程的所有活動子行程的串列,呼叫該方法的副作用是“阻塞”任何已經完成的行程(原文:Calling this has the side effect of “joining” any processes which have already finished,)

  • multiprocessing.cpu_count()

    回傳系統的CPU數量,該數量并不等于當前行程可以使用的CPU數量,可用cpu的數量可以通過len(os.sched_getaffinity(0))獲取,不過可能會拋NotImplementedError例外,

  • multiprocessing.``current_process()

    回傳當前行程對應的multiprocessing.Process對下,類似threading.current_thread()

  • multiprocessing.get_all_start_methods()

    回傳支持的啟動方法的串列,其中第一個是默認方法,可能的啟動方法有'fork', 'spawn''forkserver',在Windows上,僅 'spawn'可用,在Unix上,始終支持'fork''spawn',默認值為“'fork'

    3.4版新增

  • multiprocessing.get_start_method(allow_none=False)

    回傳用于啟動行程的啟動方法的名稱,如果尚未設定啟動方法,且allow_noneFalse,則回傳默認方法名詞,如果尚未設定啟動方法,并且allow_noneTrue,則回傳None,回傳值可以是'fork', 'spawn', 'forkserver'None. 'fork'為Unix上的默認值,而'spawn'則是Windows上的默認值,

    3.4版新增,

  • multiprocessing.``set_start_method(method)

    設定應用于啟動子行程的方法,method可以是 'fork', 'spawn''forkserver',請注意,最多只能呼叫一次,并且應該在主模塊的if__name__=='__main__'子句中使用,

    3.4版新增,

  • ..略,更多參考請查閱官方檔案

..略,更多參考請查閱官方檔案

Process工具

可以創建一個行程池,用于執行使用multiprocessing.pool.Pool類提交給它的任務,

Pool類
  • class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

    一個行程池物件,用于控制可以向其提交作業的作業行程池,它支持帶有超時和回呼的異步結果,并具有并行map實作,

    • processes 是要使用的作業行程的數量,如果processesNone,則默認使用os.cpu_count()回傳的數字,
    • initializer 如果值不為None,那么每個作業行程在啟動時都會呼叫initializer(*initargs)
    • maxtasksperchild 是作業行程在退出并替換為新的作業行程之前可以完成的任務數,以便釋放未使用的資源,默認的maxtasksperchildNone,這意味著作業行程存活時間將與行程池一樣長,
    • context 用于指定用于啟動作業行程的背景關系,通常,行程池是使用背景關系物件的函式multiprocessing.Pool()Pool()方法創建的,在這兩種情況下,背景關系都設定得適當,

    請注意,池物件的方法只能由創建池的行程呼叫,

    3.2版新增:maxtasksperchild

    3.4版新增:context

    注意:

    池中的作業行程通常在作業佇列的整個持續時間內保持存活,在其他系統(如Apache、mod_wsgi等)中發現的一種釋放作業行程所持有資源的常見模式是,允許池中的作業行程在退出、清理和生成新行程以取代舊行程之前只完成一定數量的作業,池的maxtasksperchild引數向最終用戶暴露了這一能力,

    apply(func[, args[, kwds]])

    使用引數args和關鍵字引數kwds呼叫func,它會阻塞,直到可獲取結果為止,考慮到阻塞問題,apply_async()更適合并行執行作業,此外,func只在池的一個作業行程中執行,

    apply_async(func[, args[, kwds[, callback[, error_callback]]]])

    apply()方法的變體,回傳結果物件,

    如果指定了callback,那么它應該是一個接受單個引數的可呼叫函式,當可獲取結果時,將對其應用callback,除非呼叫失敗,在這種情況下,將對其應用error_callback

    如果指定了error_callback,那么它應該是一個接受單個引數的可呼叫函式,如果目標函式失敗,則會使用例外實體呼叫error_callback

    回呼應該立即完成,否則處理結果的執行緒將被阻塞,

    map(func, iterable[, chunksize])

    內置函式map()的并行等價物(不過它只支持一個iterable引數),它會阻塞,直到可獲取結果,

    該方法將iterable分割為多個塊,并將這些塊作為單獨的任務提交給行程池,可以通過將chunksize設定為正整數來指定這些塊的(近似)大小,

    map_async(func, iterable[, chunksize[, callback[, error_callback]]])

    map()方法的一個變體,它回傳一個結果物件,

    如果指定了callback ,那么它應該是一個接受單個引數的可呼叫函式,當可獲取結果時,將對其應用callback,除非呼叫失敗,在這種情況下,將應用error_callback

    如果指定了error_callback,那么它應該是一個接受單個引數的可呼叫函式,如果目標函式失敗,則會使用例外實體呼叫error_callback

    回呼應該立即完成,否則處理結果的執行緒將被阻塞,

    imap(func, iterable[, chunksize])

    map()的一個更惰性版本,

    chunksize引數與map()方法使用的引數相同,對于非常長的迭代,使用較大的chunksize值可以使作業比使用默認值1更快地完成,

    此外,如果chunksize為1,則imap()方法回傳的迭代器的next()方法有一個可選的timeout引數:如果無法在timeout秒內回傳結果,next(timeout)將引發multiprocessing.TimeoutError

    imap_unordered(func, iterable[, chunksize])
    imap()相同,只是回傳迭代器的結果的順序是任意的,(只有當只有一個作業行程時,才能保證順序“正確”)

    starmap(func, iterable[, chunksize])

    類似于map(),只是iterable的元素被當做引數,不拆解,

    因此,[(1,2), (3,4)]的迭代結果是[func(1,2),func(3,4)],

    3.3版新增,

    starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

    starma()map_async()的組合,對可迭代項中的可迭代項進行迭代,并在未拆解可迭代項的情況下呼叫func,回傳一個結果物件,

    3.3版新增,

    close()
    阻止將更多任務提交到行程池中,完成所有任務后,作業行程將退出,

    terminate()

    在未完成未完成的作業的情況下立即停止作業行程,當行程池物件被垃圾回收時,將立即呼叫terminate()

    join()

    等待作業行程退出,在使用join()之前,必須呼叫close()terminate()

    3.3版新增:行程池物件現在支持背景關系管理協議——請參閱背景關系管理器型別__enter__()回傳池物件,__exit_()呼叫terminate()

AsyncResult
  • class multiprocessing.pool.AsyncResult

    Pool.apply_async()和Pool.map_async()回傳的結果類,

get([timeout])
當結果已準備好時回傳結果,如果timeout不是None,并且沒有在timeout秒內獲取到結果,則會引發multiprocessing.TimeoutError,如果遠程呼叫引發了例外,則該例外將由get()重新拋出,

wait([timeout])
等待,直到結果可獲取,或者直到超過timeout秒,

ready()
回傳呼叫是否完成

successful()

回傳呼叫是否已完成,不引發例外,如果結果還未準備好,將引發AssertionError

行程池使用示例
from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

...略

作者:授客
微信/QQ:1033553122
全國軟體測驗QQ交流群:7156436

Git地址:https://gitee.com/ishouke
友情提示:限于時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞后如有任何疑問,請聯系我!!!
           微信打賞                        支付寶打賞                  全國軟體測驗交流QQ群  
              

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/555555.html

標籤:其他

上一篇:Manacher演算法學習筆記

下一篇:返回列表

標籤雲
其他(161276) Python(38242) JavaScript(25505) Java(18249) C(15237) 區塊鏈(8271) C#(7972) AI(7469) 爪哇(7425) MySQL(7258) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5875) 数组(5741) R(5409) Linux(5347) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4603) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2436) ASP.NET(2404) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) .NET技术(1984) HtmlCss(1968) 功能(1967) Web開發(1951) C++(1942) python-3.x(1918) 弹簧靴(1913) xml(1889) PostgreSQL(1881) .NETCore(1863) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Python 標準類別庫-并發執行之multiprocessing-基于行程的并行

    ### 實踐環境 Python3.6 ### 介紹 `multiprocessing`是一個支持使用類似于執行緒模塊的API派生行程的包。該包同時提供本地和遠程并發,通過使用子行程而不是執行緒,有效地避開了全域解釋器鎖。因此,`multiprocessing`模塊允許程式員充分利用給定機器上的多個處理器 ......

    uj5u.com 2023-06-20 08:05:22 more
  • Manacher演算法學習筆記

    # Manacher演算法是什么 ~~Manacher演算法就是馬拉車。~~ Manacher演算法就是用于解決回文子串的個數的。 # 問題引入 [P3805:【模板】manacher 演算法](https://www.luogu.com.cn/problem/P3805) # 題目大意 給出一個只由小寫英 ......

    uj5u.com 2023-06-20 08:00:12 more
  • 前端學習C語言 - 函式和關鍵字

    ## 函式和關鍵字 本篇主要介紹:`自定義函式`、`宏函式`、`字串處理函式`和`關鍵字`。 ### 自定義函式 #### 基本用法 實作一個 add() 函式。請看示例: ```c #include // 自定義函式,用于計算兩個整數的和 int add(int a, int b) { // a ......

    uj5u.com 2023-06-20 08:00:07 more
  • Rust語言 - 介面設計的建議之顯而易見(Obvious)

    # Rust語言 - 介面設計的建議之顯而易見(Obvious) - [Rust API 指南 GitHub](https://github.com/rust-lang/api-guidelines): - [Rust API 指南 中文](https://rust-chinese-translat ......

    uj5u.com 2023-06-20 08:00:02 more
  • java后端接入微信小程式登錄功能

    # 前言 此文章是Java后端接入微信登錄功能,由于專案需要,舍棄了解密用戶資訊的`session_key`,只保留`openid`用于檢索用戶資訊 后端框架:spring boot 小程式框架:uniapp # 流程概括 - 官方流程:通過自定義登錄態與openid,session_key關聯,之 ......

    uj5u.com 2023-06-20 07:59:54 more
  • 一種實作Spring動態資料源切換的方法

    ## 1 目標 不在現有查詢代碼邏輯上做任何改動,實作dao維度的資料源切換(即表維度) ## 2 使用場景 節約bdp的集群資源。接入新的寬表時,通常uat驗證后就會停止集群釋放資源,在對應的查詢服務器uat環境時需要查詢的是生產庫的表資料(uat庫表因為bdp實時任務停止,沒有資料落入),只進行 ......

    uj5u.com 2023-06-20 07:59:44 more
  • java~搞懂Comparable介面的compareTo方法

    `Comparable` 介面的 `compareTo` 方法的升序或降序取決于實作該介面的類的具體實作。按照慣例,`compareTo` 方法應該回傳負數、零或正數來指示當前物件是小于、等于還是大于傳入的物件。具體來說: - 如果 `this` 物件小于傳入的物件,則 `compareTo` 應該 ......

    uj5u.com 2023-06-20 07:59:36 more
  • ElasticSearch的使用和介紹

    # 1、概述 ## 功能 Elasticsearch 是一個分布式的 RESTful 搜索和分析引擎,可用來集中存盤您的資料,以便您對形形色色、規模不一的資料進行搜索、索引和分析。 例如: - 在電商網站搜索商品 ![image](https://img2023.cnblogs.com/blog/3 ......

    uj5u.com 2023-06-20 07:54:11 more
  • Java 運算子的使用

    # Java 運算子的使用 # 1.算術運算子 ## 算術運算子包括: +, -, *, /, %, ++, --,其中需要注意的是%,++,--; ## % 取模運算也叫做取余,在 Java 中取余的規則: a % b = a - a / b * b,如果是小數的話是這樣:a % b = a- ( ......

    uj5u.com 2023-06-20 07:48:58 more
  • 【python基礎】函式-值傳遞

    為了更好的認識函式,我們還要研究值傳遞問題,再研究這個問題之前,我們已經知道了函式之間的值傳遞,是實參變數值傳遞給形參變數,然后讓形參變數在函式內完成相應的功能。但是因為資料型別的不同,這里的值傳遞產生的對實參變數的效果是不同的 # 1.傳遞資料本質 引數傳遞之間傳遞的肯定是資料,而這種資料本質上是 ......

    uj5u.com 2023-06-20 07:43:39 more