我正在制作一個多執行緒應用程式,其中主行程通過佇列將訊息發送到適當的執行緒。我的疑問在于執行緒的一部分:我發現的解決方案不斷監聽(達到一個限制,這就是為什么我有我的類 Clock 及其方法“isRunnig”,如果時間尚未到期則回傳 True),如果有些嘗試沒有資料到達,然后我捕獲例外并繼續。
我先把主流程的代碼簡化一下:
def callUpdate (self, update : Update): #Update is a class that includes the correct ID of its thread and the data to process by the thread.
find = False
wrapp : _Wrapper = None
for current in self.threads:
if (type(current) is not _Wrapper): #_Wrapper is a class that includes the thread
continue
if not current.theThread.is_alive() :
#Here I save some data, and I remove the thread from
self.threads.remove(current)
continue
if (current.id == update.id):
wrapp = current
find = True
break
#Here I do some things and then, I create a new thread if not found and send first message (the update itself in this first send), or if its found and working (alive), I just send the data to the thread. Wrapper creates a new queue and saves the thread to send more data later if needed.
if (not find):
wrapp = _Wrapper(data)
self.threads.append(wrapp)
wrapp.queue.put(update)
bot.start()
else:
#Thread already working and I send the update
wrapp.queue.put(update)
好吧,現在我包括一個簡化的執行緒部分,這讓我擔心,因為它看起來有點“草率”。請注意,我以 1 秒的暫停讀取訊息佇列。我有一個時鐘類,如果指示的時間已經過去(在本例中為 120 秒),它只會回傳
def process (self): #This process is part of the class that heritate from Thread (class ProcessThread (threading.Thread):
clock = Clock(seconds=120)
while (clock.isRunning()):
update: Update = self.getUpdateFromQueue(seconds=1)
if (update is None) : continue
#At this point, the message update is correct and I process the data. Once the clock is finnish, I finnish the process
return
問題是有時程式的執行速度變慢很多,執行緒少或執行緒多(似乎與它無關);我還嘗試減少佇列的重讀時間(因為如果有很多請求,它似乎會導致問題)。我覺得它很hacky,任何人都可以建議我在多執行緒中接收排隊資料的任何其他選項嗎?
謝謝
---------- 編輯 ---------- 對不起,我沒有包括從佇列中獲取資料的程序:
#Get data from queue, maximum wait time in seconds.
def getUpdateFromQueue (self, seconds=10):
max = datetime.datetime.now() datetime.timedelta(seconds=seconds)
current = datetime.datetime.now()
while (current < max):
try:
data : Update = self.queue.get(timeout=0.01)
return data
except Empty:
current = datetime.datetime.now()
continue
return None
uj5u.com熱心網友回復:
您的代碼無緣無故地旋轉等待,這自然會損害性能;您根本不應該在自己的代碼中這樣做。而是使用超時功能queue.Queue
來處理您的超時。
例如,getUpdateFromQueue
不需要在對queue.get
;的短時間呼叫之間回圈并查看掛起時間。它可以seconds
直接將最大值傳遞給queue.get
:
def getUpdateFromQueue(self, seconds=10):
try:
return self.queue.get(timeout=seconds)
except Empty:
return None
但是你不需要它首先是它自己的功能。代替:
def process(self):
clock = Clock(seconds=120)
while (clock.isRunning()):
update: Update = self.getUpdateFromQueue(seconds=1)
if (update is None) : continue
return
您可以queue.get
直接使用您嘗試使用您的Clock
類強制執行的總體最大超時:
def process(self):
try:
return self.queue.get(timeout=120)
except Empty:
return None
這應該具有相同的效果(回傳一條資料,在回傳之前等待最多 120 秒None
),沒有兩個不斷旋轉 CPU 的嵌套while
回圈(并且兩者都在做同樣的事情,只是解析度不同)。
如果您需要處理多條訊息,您只需要一個回圈,在其中調整每個訊息的超時時間get()
以反映總體截止日期。(我在time.monotonic()
這里使用它是因為它根據定義不會被系統時鐘的變化所拋棄。)
from queue import Empty
from time import monotonic
def process(self, data):
# do whatever you need to do with one piece of data
pass
def process_messages_with_timeout(self, timeout=120):
deadline = monotonic() timeout
while True:
try:
self.process(self.queue.get(timeout=deadline - monotonic()))
except Empty:
break
get()
重要的是,您應該只需要在實際超時的情況下對您實際想要獲取的每個專案進行一次呼叫;get()
使用比您想要的更短的超時時間然后添加額外的邏輯以在實際超時時間內重試是沒有意義的。在回圈中添加額外的回圈沒有任何意義。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/475207.html