python multiprocessing庫使用記錄
需求是想并行呼叫形式化分析工具proverif,同時發起對多個query的分析(378個),實驗室有40核心80執行緒的服務器(雙cpu,至強gold 5218R*2),
觀察到單個命令在分析時記憶體占用不大,且只使用單核心執行,因此考慮同時呼叫多個命令同時執行分析,加快結果輸出,
最底層的邏輯是呼叫多個命令列陳述句,和在命令列直接執行proverif陳述句類似,在python中也就是使用 os.system()
函式實作命令呼叫,然而由于存在如下問題,需要考慮使用多行程multiprocessing庫,
- 如果使用多執行緒threading庫,由于GIL的存在,是否會因為一個行程未執行結束而無法發起新的行程?
- query數量很大的原因來自于多場景分析,同時對于同一場景下的query也希望可以并行推進,同時分析,
- query數量大+場景多,得到很多結果,每條分析陳述句都有各自不同的位置,需要生成大量的命令,
- 每條query執行完成后會給出分析結果,雖然分析結果會以html檔案的形式輸出到指定結果檔案夾,但是不能對分析結果做統一的分析,仍舊需要逐個閱讀,希望能在輸出后即時統計,原有輸出不變,還能給出分析結果表,
- 盡管proverif在分析上速度已經很好了,但是仍然有62條query在30000秒(8.3h)后未給出結果,希望能夠統計每一條query的運行時間并記錄,并能夠提供當前仍在執行的query數量,
- 進一步的,設定最高分析時長上限(如48h),若超出上限則終止分析,
- 對于一些可達性查詢(reachability,實作方法是:在物體執行最后,在公開信道上發送執行完成標記,檢查攻擊者是否檢驗物體代碼是否正確,以及攻擊者是否能夠阻止合法物體正常執行程式(如何做?)),會出現構建攻擊路徑很慢的情況,但是實際上已經給出了goal reachable的結果,對于這種其實無需浪費更多時間,可以把reachability的query添加
set reconstructTrace = false .
以提前結束, - 對于數量監控,需要多行程讀寫共享變數;對于運行時間記錄,需要多行程讀寫同一個檔案,
mutliprocessing庫使用
主要使用multiprocessing.Pool()
來創建行程池,當前python行程會創建新的python行程用于執行函式,(win下是子行程,linux下是fork)
由于存在作業系統上的差異,請使用if __name__ == '__main__':
來撰寫主函式,否則可能出現問題,主函式內容如下,
query_num = multiprocessing.Value('i', 0)
def long_time_task(c, ):
start = time.time()
os.system(c)
end = time.time()
# task_name=...
with query_num.get_lock():
query_num.value -= 1
print('Task %s runs %0.2f seconds. ' % (task_name, (end - start)) + str(query_num.value) + ' left.')
return 'Task %s runs %0.2f seconds.' % (task_name, (end - start))
def call_back(s):
with open('/home/dell/proverif/DDS/time.txt', "a+") as file:
file.writelines(s + '\n')
if __name__ == '__main__':
query_list = extract(path_query, 'query', '.')
query_file_path_list = query_file(query_list)
whole_cS = compromise_Scenarios(path_compromise, path_process_whole, work_path)
MAC_cS = compromise_Scenarios(path_compromise, path_process_MAC, work_path)
cmd = []
cmd += (pv_cmd(query_file_path_list, whole_cS, path_result))
cmd += (pv_cmd(query_file_path_list, MAC_cS, path_result))
p = Pool(len(cmd))
query_num.value = https://www.cnblogs.com/biing/archive/2023/07/07/len(cmd)
# for i in cmd:
# p.apply_async(long_time_task, args=(i,), callback=call_back)
results = [p.apply_async(long_time_task, (i,), callback=call_back) for i in cmd]
print('Waiting for all subprocesses done...')
output = [result.get(timeout=24*60*60) for result in results]
# p.close()
# p.join()
print('All subprocesses done.')
主函式前7行為文本處理,其內容不細表,
第8行p = Pool(len(cmd))
創建了行程池,其長度為cmd的個數,也就是我們要同時發起這么多個行程,接下來注釋掉的回圈是常規的多行程發起辦法,即使用apply_async函式執行我們要的函式,args是long_time_task的引數,由于需要為Iterable且只有一個引數,因此以元組形式傳入,
call_back
引數為回呼函式,這里很像go語言下的defer,會在函式執行后再執行,回呼函式接受long_time_task
的回傳值作為引數,我們使用這個機制實作多行程寫檔案,long_time_task
在回傳后會受到行程池p的調度,依次執行寫檔案操作,因此避免了同時寫引起沖突,
對于剩余的query數量,使用全域變數query_num = multiprocessing.Value('i', 0)
,這樣的變數具有鎖,可以供多行程讀寫,每個query在完成后會將數量減一,輸出時間和剩余數量,使用with query_num.get_lock():
獲得鎖,避免讀寫沖突,并在使用完成后自動釋放,
這已經滿足了基本需求,還有一個定時終止的功能有待實作,接下來再介紹我不斷修改的思路,
多行程定時終止
單行程定時終止
process = multiprocessing.Process(target=long_time_task)
# 啟動行程
process.start()
# 設定運行時長上限(48小時)
timeout = 48 * 60 * 60 # 以秒為單位
# 創建定時器,在指定時間后終止行程
timer = multiprocessing.Timer(timeout, process.terminate)
timer.start()
# 等待行程結束
process.join()
使用定時器的辦法,在一定時間后呼叫我們創建行程的process.terminate()方法結束行程,但我們需要多行程并行,
多行程定時終止
pool = multiprocessing.Pool()
# 準備要執行函式的引數串列
inputs = [1, 2, 3, 4, 5]
# 執行函式,并設定最大運行時長為30秒
result = pool.map_async(long_time_task, inputs)
# 獲取結果,最多等待30秒
output = result.get(timeout=48 * 60 * 60)
map_async
方法可以將函式應用于可迭代的引數串列,并回傳一個AsyncResult
物件,可以使用該物件的get
方法獲取結果,map_async
方法將任務提交給行程池后會立即回傳,并不會等待所有任務執行完成,如果在get
方法獲取結果時,其中某些任務仍在執行,將會等待直到超時,get
方法擁有timeout引數,超時后會raise TimeoutError
,報錯終止python程式的運行,因此如果想輸出已完成的結果,有兩個思路:
- try-except捕獲
TimeoutError
,并針對處理, - 對每個結果都使用get方法并設定超時時間,
串列推導式
results = [p.apply_async(long_time_task, (i,), callback=call_back) for i in cmd]
print('Waiting for all subprocesses done...')
output = [result.get(timeout=24*60*60) for result in results]
# p.close()
# p.join()
print('All subprocesses done.')
使用apply_async
方法來執行函式,該方法會也會回傳一個AsyncResult
物件,我們將這些物件放入results陣列,接著使用陣列中每個元素的結果組成output陣列并定義超時時間,這樣就可以執行call_back函式了,output內容其實不是很重要,主要是為了使用AsyncResult物件的get方法來設定定時器,
不過這樣還是需要try-except捕獲TimeoutError
,以處理超時未完成的query,這樣做比map_async
好在哪里?我在使用的時候map_async似乎不能成功呼叫回呼函式,還有待試驗,此外,該方法并不能在設定時間時準時停下,例如我設定時間5s,則會在約12秒時才停止,
還有一個問題是,在pycharm里運行腳本時,會有部分行程無法結束,暫不清楚其原因,也不確定命令列下執行腳本是否存在同樣的問題,
與Go相比
顯著的感覺到python在處理多行程、多執行緒、并發等問題上有一定的弱點,雖然能夠通過一系列操作實作,但是做起來比較吃力,也不算太優雅,現在的腳本已經可以并行分析了,然而在任務管理器中,除了看到了378個proverif行程,還看到了378個sh和378個python??
這378個python其實是沒必要的,如果使用goroutine,發起多個協程執行shell命令即可,這樣在開銷和效率上都會更好,不過其實總的記憶體占用并不太高,所以這個點不算非常大的問題,但是如果使用更復雜的分析工具,mutliprocessing多行程呼叫就太笨拙了,
此外,在共享變數的訪問上也不那么容易,例如query_num這個共享變數,多行程的訪問就不是很方便,如果使用goroutine,則可以使用channel,創建一個monitor goroutine來接受各個goroutine的回傳值,并做計數處理;檔案讀寫也可以在channel+監視器內完成,無需考慮檔案讀寫爭用,或者也可以用mutex來互斥地讀寫檔案,
在計時器的操作上,目前只了解到python的解決辦法是拋出超時例外,這種方法會使得沒執行完畢的腳本無法正常回傳,不能給出資訊,需要根據執行結果做一些特定的例外處理,也可能這是會有部分行程無法正常結束的原因,go語言有Timer類可以執行計時器操作,有望更優雅的解決問題,
后續計劃
后續會考慮使用go語言撰寫一個專用于proverif的多行程并發分析呼叫工具,使用更優雅、效率更高的方法,實作:
- 多行程并發呼叫
- query分析時長記錄、分析結果匯出
- 總分析時長上限設定,超時后正常退出,并標記超時query,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/556769.html
標籤:其他
上一篇:python:匯入庫、模塊失敗
下一篇:返回列表