我有一個名為的 dag my_dag.py
,它利用 Airflow 2 中的 S3KeySensor 來檢查是否存在 s3 密鑰。當我直接在 dag 內部使用傳感器時,它可以作業:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
poke_interval = 30
timeout = 60*60
mode = 'reschedule'
dependency_name = 'my_file'
S3KeySensor(
task_id = 'check_' dependency_name '_exists',
bucket_key = path,
poke_interval = poke_interval,
timeout = timeout,
mode = mode
)
上面的日志如下所示:
[2022-05-03, 19:51:26 UTC] {s3.py:105} INFO - Poking for key : s3://my-bucket/data/my_file
[2022-05-03, 19:51:26 UTC] {base_aws.py:90} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2022-05-03, 19:51:27 UTC] {taskinstance.py:1701} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
這是對的。預計會重新安排,因為該檔案尚不存在。
但是,我想檢查其他 dag 中的任意數量的路徑,因此我將傳感器移動到test
另一個名為helpers.py
. 我在my_dag.py
呼叫test
. 它看起來像這樣:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
dependency_name = 'my_file'
wait_for_dependencies = PythonOperator(
task_id = 'wait_for_my_file',
python_callable = test,
op_kwargs = {
'dependency_name': dependency_name,
'path': path
},
dag = dag
)
wait_for_dependencies
中的函式如下test
所示helpers.py
:
def test(dependency_name, path, poke_interval = 30, timeout = 60 * 60, mode = 'reschedule'):
S3KeySensor(
task_id = 'check_' dependency_name '_exists',
bucket_key = path,
poke_interval = poke_interval,
timeout = timeout,
mode = mode
)
但是,當我運行 dag 時,即使檔案不存在,該步驟也會被標記為成功。日志顯示:
[2022-05-03, 20:07:54 UTC] {python.py:175} INFO - Done. Returned value was: None
[2022-05-03, 20:07:54 UTC] {taskinstance.py:1282} INFO - Marking task as SUCCESS.
似乎氣流不喜歡通過 python 運算子使用傳感器。這是真的?還是我做錯了什么?
我的目標是遍歷多條路徑并檢查每條路徑是否存在。但是,我在其他 dag 中執行此操作,這就是為什么我將傳感器放在駐留在另一個檔案中的函式中的原因。
如果有其他想法可以做到這一點,我很開放!
謝謝你的幫助!
uj5u.com熱心網友回復:
這不會像您期望的那樣作業。您在運算子中創建了一個運算子案例。有關這意味著什么的資訊,請參閱此答案。
在您的情況下,您將 with 包裹S3KeySensor
起來PythonOperator
。這意味著在PythonOperator
運行時它只執行 init 函式S3KeySensor
- 它不會呼叫運算子本身的邏輯。在運算子中使用運算子是一種不好的做法。
當您嘗試在操作員內部使用傳感器時,您的情況更加極端。傳感器需要poke()
為每個戳回圈呼叫該函式。簡化 - 當你像你一樣設定它們時,你不能享受傳感器的強大功能,因為重新安排意味著如果條件尚未滿足但不知道如何執行mode = 'reschedule'
,你想釋放作業人員。PythonOperator
如何解決您的問題:
選項1:
從您展示的代碼中,您可以簡單地執行以下操作:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
dependency_name = 'my_file'
S3KeySensor(
task_id='check_' dependency_name '_exists',
bucket_key=path,
poke_interval=30,
timeout=60 * 60,
mode='reschedule'
)
我沒有看到為什么這對你不起作用的原因。
選項 2:
如果由于某種原因選項 1 對您不利,則創建一個也接受的自定義傳感器dependency_name
,path
并像任何其他操作員一樣使用它。我沒有測驗它,但類似以下的東西應該可以作業:
class MyS3KeySensor(S3KeySensor):
def __init__(
self,
*,
dependency_name:str = None,
path: str = None,
**kwargs,
):
super().__init__(**kwargs)
self.task_id = task_id = 'check_' dependency_name '_exists'
self.bucket_name = path
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/470554.html