使用python,我試圖演示當消費者執行緒最終等待一個空佇列時,生產者/消費者多執行緒場景如何導致死鎖,該佇列將在其余的執行程序中保持空,直到它結束和如何解決這個避免饑餓或程式突然“臟中斷”的問題。
所以我在這篇不錯的 RealPython 文章中使用佇列從生產者/消費者執行緒中獲取代碼,這里是原始代碼摘錄:
def consumer(queue, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
"Consumer storing message: %s (size=%d)", message, queue.qsize()
)
logging.info("Consumer received event. Exiting")
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()
我注意到,盡管不太可能發生,代碼會導致我在“主”執行緒設定“事件”的情況下描述的情況,并且“生產者”在“消費者”仍在等待時結束從佇列中獲取訊息。
為了在單個“消費者”的情況下解決問題,只需在“get”指令呼叫就足夠之前檢查佇列是否為空(例如:)
if (not q.empty()): message = q.get()
。但是,這個問題在多消費者場景中仍然存在,因為執行緒可以在佇列空檢查后立即交換,另一個消費者(第二個)可以獲得訊息,使佇列為空,以便交換回前一個消費者(第一個)它會在一個空佇列上呼叫 get 并且......就是這樣。
我想尋找一種即使在假設的多消費者場景中也可能有效的解決方案。所以我以這種方式修改了“消費者”代碼,實質上在佇列獲取指令上添加了超時并管理例外:
def consumer(q, event, n):
while not event.is_set() or not q.empty():
print("Consumer" n ": Q-get")
try:
time.sleep(0.1) #(I don't really need this, I just want to force a consumer-thread swap at this precise point :=> showing that, just as expected, things will work well in a multi-consumer scenario also)
message = q.get(True,1)
except queue.Empty:
print("Consumer" n ": looping on empty queue")
time.sleep(0.1) #(I don't really need this at all... just hoping -unfortunately without success- _main_ to swap on ThreadPoolExecutor)
continue
logging.info("Consumer%s storing message: %s (size=%d)", n,message,q.qsize())
print("Consumer" n ": ended")
并且還修改了“主要”部分以使其將訊息放入佇列并使其產生第二個消費者而不是生產者......
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
pipeline.put("XxXxX")
print("Let's start (ThreadPoolExecutor)")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
#executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event, '1')
executor.submit(consumer, pipeline, event, '2')
print("(_main_ won't even get this far... Why?)") #!#
time.sleep(2)
logging.info("Main: about to set event")
event.set()
(請注意,我在這里的目的是阻止消費者死鎖風險并表明它實際上是無效的,在這個階段我并不真正需要生產者,這就是我讓代碼不產生它的原因)
現在,問題是,我不明白為什么,如果執行緒是用 生成的,一切似乎都運行良好
threading.Thread(...).start()
,例如:
print("Let's start (simple Thread)")
for i in range(1,3): threading.Thread(target=consumer, args=(pipeline, event, str(i))).start()
而使用concurrent.futures.ThreadPoolExecutor
它似乎使'主'執行緒永遠不會恢復(似乎它甚至沒有進入它的睡眠呼叫),因此執行永遠不會結束導致無限的消費者回圈......你能幫我理解為什么這個“不同之處”?知道這對我來說很重要,我想幾乎可以肯定會幫助我理解它是否可以以某種方式解決,或者我是否必須被迫不使用 ThreadPoolExecutor,所以......提前感謝您在這方面的寶貴幫助!
uj5u.com熱心網友回復:
問題是你event.set()
把with
管理ThreadPoolExecutor
. 與 一起使用時with
,在退出時with
,ThreadPoolExecutor
執行等效于.shutdown(wait=True)
. 所以你在等待工人完成,他們不會,因為你還沒有設定event
.
如果您希望能夠告訴它何時關閉,但不立即等待,您可以這樣做:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
try:
#executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event, '1')
executor.submit(consumer, pipeline, event, '2')
executor.shutdown(wait=False) # No new work can be submitted after this point, and
# workers will opportunistically exit if no work left
time.sleep(2)
logging.info("Main: about to set event")
finally:
event.set() # Set the event *before* we block awaiting shutdown
# Done in a finally block to ensure its set even on exception
# so the with doesn't try to block in a case where
# it will never finish
# Blocks for full shutdown here; after this, the pool is definitely finished and cleaned up
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/470143.html
標籤:Python python-3.x 多线程
上一篇:使消費者/生產者問題更高??級