我正在嘗試將此轉換器(從 S3 到 JSON 的 XML 檔案)轉換為多執行緒應用程式,以便我可以加快執行多個檔案(有 985 個)。由于給定的檔案大約為 1gb,我想一次性發送 8 個這樣的檔案進行決議。
每當我運行它時,我都會得到:RuntimeWarning: coroutine 'process_object' was never awaited
這是高級別的代碼:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
return f"Processed {filename} in {time.time() - start} seconds"
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [
asyncio.wrap_future(future)
for future in [
loop.run_in_executor(executor, process_object, url) for url in objects
]
]
results = loop.run_until_complete(asyncio.gather(*futures))
loop.close()
uj5u.com熱心網友回復:
我已經修改并簡化了您的代碼我不知道您為什么要將執行緒池期貨與 asyncio 結合起來,如果您想限制可以在 Asyncio 中使用信號量的作業人員數量
下面是不使用并發期貨和簡化代碼的代碼,因為我無法在我的本地完全重現上述錯誤
試試這個:
async def process_object(filename, pid=None):
start = time.time()
s3 = S3Client(...)
opensearch_client = OpenSearchClient(...)
Parser.DEBUG = True
parser = Parser(s3, opensearch_client)
save_file = ...
s3.download_from_s3(filename, save_file)
parser.current_prefix = filename
await parser.parse(save_file)
print(f"Processed {filename} in {time.time() - start} seconds")
async def process_objects_bg(objects):
resp = await asyncio.gather(*[process_object(url) for url in objects])
return resp
if "__main__" == __name__:
objects = get_objects(top_n=3) # list of prefixes for S3
asyncio.run(process_objects_bg(objects))
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/470132.html
上一篇:標準優先級佇列何時比較值?