Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面提供常見工作流程問題的疑難排解步驟和資訊。
許多 DAG 執行問題都是因為環境效能不佳所致。您可以按照這篇文章的說明,將環境調整至最佳狀態。
部分 DAG 執行問題可能是因為 Airflow 排程器無法正常運作或運作不佳。請按照排程工具疑難排解操作說明解決這些問題。
排解工作流程問題
如何開始進行疑難排解:
查看 Airflow 記錄。
您可以覆寫下列 Airflow 設定選項,提高 Airflow 的記錄層級。
區段 鍵 值 logging
logging_level
預設值為 INFO
。將其設為DEBUG
,即可在記錄訊息中獲得更多詳細資訊。查看監控資訊主頁。
查看 Cloud Monitoring。
在 Google Cloud 主控台中,前往環境的元件頁面查看是否有錯誤。
在 Airflow 網頁介面中查看 DAG 的圖表檢視,檢查是否有失敗的工作執行個體。
區段 鍵 值 webserver
dag_orientation
LR
、TB
、RL
或BT
。
針對運算子錯誤進行偵錯
如何針對運算子錯誤進行偵錯:
- 檢查是否有與特定工作相關的錯誤。
- 查看 Airflow 記錄。
- 查看 Cloud Monitoring。
- 查看運算子專屬記錄。
- 修正錯誤。
- 將 DAG 上傳至
/dags
資料夾。 - 在 Airflow 網頁介面中,清除 DAG 的狀態記錄。
- 繼續或執行 DAG。
排解任務執行問題
Airflow 是分散式系統,其中包含許多實體,例如排程器、執行者和工作站,這些實體會透過工作佇列和 Airflow 資料庫彼此溝通,並傳送信號 (例如 SIGTERM)。下圖概略說明 Airflow 元件之間的連結。
在 Airflow 等分散式系統中,可能會發生一些網路連線問題,或是基礎架構可能會發生間歇性問題;這可能導致任務失敗,並重新排定執行時間,或是任務可能無法順利完成 (例如殭屍任務,或執行過程中卡住的任務)。Airflow 有處理這類情況的機制,可自動恢復正常運作。以下各節將說明 Airflow 執行工作時發生的常見問題:殭屍工作、終止執行個體和 SIGTERM 信號。
排解無效工作問題
Airflow 會偵測工作與執行工作的程序之間的兩種不相符情形:
Zombie 工作是指應執行但未執行的工作。如果工作程序已終止或未回應、Airflow 工作者因超載而未及時回報工作狀態,或是執行工作所在的 VM 已關閉,就可能發生這種情況。Airflow 會定期尋找這類工作,並根據工作設定重試或失敗。
找出殭屍工作
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-scheduler") textPayload:"Detected zombie job"
不死任務是指不應執行的工作。Airflow 會定期找出這類工作並終止。
以下各節將說明殭屍工作常見的原因和解決方法。
Airflow worker 記憶體不足
每個 Airflow 工作者最多可同時執行 [celery]worker_concurrency
個工作執行個體。如果這些工作執行個體的累積記憶體用量超過 Airflow 工作站的記憶體限制,系統會隨機終止其中一個程序,以釋出資源。
找出 Airflow 工作站記憶體不足事件
resource.type="k8s_node" resource.labels.cluster_name="GKE_CLUSTER_NAME" log_id("events") jsonPayload.message:"Killed process" jsonPayload.message:("airflow task" OR "celeryd")
有時,Airflow worker 的記憶體不足可能會導致在 SQL Alchemy 工作階段中,將格式錯誤的封包傳送至資料庫、DNS 伺服器或 DAG 呼叫的任何其他服務。在這種情況下,連線的另一端可能會拒絕或捨棄 Airflow 工作站的連線。例如:
"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"
解決方法:
最佳化工作,以便減少記憶體用量,例如避免使用頂層程式碼。
減少
[celery]worker_concurrency
。增加 Airflow 工作站的記憶體,以便因應
[celery]worker_concurrency
變更。在 2.6.0 以下的 Cloud Composer 2 版本中,如果這個值較低,請使用目前的公式更新
[celery]worker_concurrency
。
Airflow 工作站已遭到撤銷
在 Kubernetes 上執行工作負載時,Pod 會遭到淘汰,這是正常現象。如果 Pod 的儲存空間用盡,或為了釋出資源以便為優先順序較高的工作負載服務,GKE 就會將 Pod 逐出。
探索 Airflow 工作站撤銷
resource.type="k8s_pod" resource.labels.cluster_name="GKE_CLUSTER_NAME" resource.labels.pod_name:"airflow-worker" log_id("events") jsonPayload.reason="Evicted"
解決方法:
- 如果缺少儲存空間導致遭到淘汰,您可以減少儲存空間用量,或在不需要暫存檔案時立即移除。或者,您也可以增加可用儲存空間,或在使用
KubernetesPodOperator
的專屬 pod 中執行工作負載。
Airflow worker 已終止
Airflow 工作站可能會從外部移除。如果目前執行中的任務在優雅終止期間未完成,就會中斷,並可能最終遭到偵測為殭屍。
找出 Airflow 工作站 Pod 終止作業
resource.type="k8s_cluster" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.methodName:"pods.delete" protoPayload.response.metadata.name:"airflow-worker"
可能的情況和解決方法:
在環境修改期間 (例如升級或安裝套件),Airflow 工作站會重新啟動:
探索 Composer 環境修改
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("cloudaudit.googleapis.com%2Factivity")
您可以在沒有重要工作執行時執行這類作業,或啟用工作重試功能。
維護作業期間,各種元件可能會暫時無法使用。
瞭解 GKE 維護作業
resource.type="gke_nodepool" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.metadata.operationType="UPGRADE_NODES"
您可以指定維護期間,盡量減少
與重要工作執行作業重疊。
在 2.4.5 之前的 Cloud Composer 2 版本中,終止的 Airflow 工作者可能會忽略 SIGTERM 信號,並繼續執行工作:
瞭解 Composer 自動調整資源配置功能的縮減功能
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker-set") textPayload:"Workers deleted"
您可以升級至已修正此問題的較新 Cloud Composer 版本。
Airflow 工作站負載過高
Airflow 工作站可用的 CPU 和記憶體資源數量,受到環境設定的限制。如果資源用量接近上限,可能會導致資源爭用,並在執行工作時造成不必要的延遲。在極端情況下,如果資源在較長的時間內都不足,可能會導致殭屍工作。
解決方法:
- 監控 worker 的 CPU 和記憶體用量,並調整以避免超過 80%。
Airflow 資料庫負載過高
各種 Airflow 元件會使用資料庫相互通訊,特別是用於儲存工作例項的心跳。資料庫資源不足會導致查詢時間拉長,並可能影響工作執行。
有時,Airflow worker 的記錄中會顯示以下錯誤:
(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly
This probably means the server terminated abnormally before or while
processing the request.
解決方法:
- 避免在頂層 DAG 程式碼中使用太多
Variables.get
指令。請改用 Jinja 範本擷取 Airflow 變數的值。 - 在頂層 DAG 程式碼中,在 Jinja 範本中最佳化 (減少) xcom_push 和 xcom_pull 指令的用量。
- 建議升級至較大的環境規模 (中或大型)。
- 降低排程器數量
- 降低 DAG 剖析頻率。
- 監控資料庫的 CPU 和記憶體用量。
Airflow 資料庫暫時無法使用
Airflow 工作站可能需要一些時間來偵測並妥善處理間歇性錯誤,例如暫時性的連線問題。這可能會超過預設的殭屍偵測門檻。
查看 Airflow 心跳逾時時間
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker") textPayload:"Heartbeat time limit exceeded"
解決方法:
增加無效工作逾時時間,並覆寫
[scheduler]scheduler_zombie_task_threshold
Airflow 設定選項的值:區段 鍵 值 附註 scheduler
scheduler_zombie_task_threshold
新逾時時間 (以秒為單位) 預設值為 300
排解執行個體終止問題
Airflow 會使用終止執行個體機制關閉 Airflow 工作。這個機制適用於下列情況:
- 排程器終止未按時完成的工作。
- 工作逾時或執行時間過長。
當 Airflow 終止工作執行個體時,您可以在執行工作任務的 Airflow 工作站記錄中,看到下列記錄項目:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Terminating instance.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
可能的解決方案:
檢查工作程式碼,找出可能導致工作執行時間過長的錯誤。
為 Airflow 工作站增加 CPU 和記憶體,以便加快工作執行速度。
提高
[celery_broker_transport_options]visibility_timeout
Airflow 設定選項的值。因此,排程器會等待更長的時間,等待工作完成,再將工作視為殭屍工作。對於耗時數小時的工作,這個選項特別實用。如果值過低 (例如 3 小時),排程器會將執行 5 或 6 小時的工作視為「掛起」(殭屍工作)。
提高
[core]killed_task_cleanup_time
Airflow 設定選項的值。較長的值可讓 Airflow 工作站有更多時間順利完成工作。如果值過低,Airflow 工作可能會突然中斷,無法有足夠的時間完成工作。
排解 SIGTERM 信號問題
Linux、Kubernetes、Airflow 排程器和 Celery 會使用 SIGTERM 信號終止負責執行 Airflow 工作站或 Airflow 工作項的程序。
在環境中傳送 SIGTERM 信號可能有以下幾種原因:
工作已變成殭屍工作,必須停止。
排程器發現工作重複,並傳送「終止」執行個體和 SIGTERM 訊號至工作,以便停止工作。
在水平 Pod 自動調度資源中,GKE 控制平面會傳送 SIGTERM 信號,移除不再需要的 Pod。
排程器可以將 SIGTERM 信號傳送至 DagFileProcessorManager 程序。Scheduler 會使用這類 SIGTERM 信號來管理 DagFileProcessorManager 程序生命週期,因此可以放心忽略。
範例:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
在 local_task_job 中,心跳回呼和退出回呼之間的競爭狀態,該工作會監控工作執行情形。如果心跳偵測到工作標示為成功,就無法區分工作本身是否成功,或是 Airflow 被告知將工作視為成功。不過,它會終止工作執行程式,而不會等待工作執行程式結束。
您可以放心忽略這類 SIGTERM 信號。工作已處於成功狀態,因此不會影響 DAG 執行作業的整體執行情形。
在成功狀態下,一般結束和終止工作之間的唯一差異,就是記錄項目
Received SIGTERM.
。圖 2. 心跳和退出回呼之間的競爭狀態 (按一下可放大) Airflow 元件使用的資源 (CPU、記憶體) 超過叢集節點允許的數量。
GKE 服務會執行維護作業,並將 SIGTERM 信號傳送至即將升級的節點上執行的 Pod。
當工作執行個體以 SIGTERM 終止時,您可以在執行工作時的 Airflow 工作站記錄中,看到下列記錄項目:
{local_task_job.py:211} WARNING - State of this instance has been externally set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed with exception
可能的解決方案:
當執行工作階段的 VM 記憶體不足時,就會發生這個問題。這與 Airflow 設定無關,而是與 VM 可用的記憶體量有關。
在 Cloud Composer 2 中,您可以為 Airflow 工作站指派更多 CPU 和記憶體資源。
您可以降低
[celery]worker_concurrency
並行作業的 Airflow 設定選項值。這個選項會決定特定 Airflow 工作站同時執行的工作數量。
如要進一步瞭解如何最佳化環境,請參閱「最佳化調整環境效能和成本效益」。
使用 Cloud Logging 查詢找出 Pod 重新啟動或遭到淘汰的原因
Cloud Composer 環境會使用 GKE 叢集做為運算基礎架構層。在本節中,您可以找到實用的查詢,協助找出 Airflow 工作站或 Airflow 排程器重新啟動或遭到逐出的理由。
您可以透過下列方式調整後續顯示的查詢:
您可以在 Cloud Logging 中指定所需的時間表。例如過去 6 小時、3 天,或自訂時間範圍。
您必須在 CLUSTER_NAME 中指定環境叢集的名稱。
您可以新增 POD_NAME,將搜尋範圍限制在特定 Pod 內。
探索重新啟動的容器
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME"
以下是另一種查詢,可將結果限制在特定 Pod 中:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
發現因記憶體不足事件而關閉的容器
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME"
以下是另一種查詢,可將結果限制在特定 Pod 中:
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
找出已停止執行的容器
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME"
以下是另一種查詢,可將結果限制在特定 Pod 中:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
更新或升級作業對 Airflow 工作執行作業的影響
更新或升級作業會中斷目前執行中的 Airflow 工作,除非工作是採用可延遲模式執行。
建議您在希望盡量減少對 Airflow 工作執行作業的影響時,執行這些作業,並在 DAG 和工作中設定適當的重試機制。
常見問題
以下各節說明一些常見 DAG 問題的徵兆和可能修正方式。
Airflow 工作已遭 Negsignal.SIGKILL
中斷
有時,工作可能會使用比 Airflow 工作站分配的記憶體還多。在這種情況下,可能會遭到 Negsignal.SIGKILL
中斷。系統會傳送此信號,避免進一步耗用記憶體,這可能會影響其他 Airflow 工作執行。在 Airflow 工作站的記錄中,您可能會看到下列記錄項目:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
也可能會顯示為程式碼 -9
。
可能的解決方案:
降低 Airflow 工作站的
worker_concurrency
。增加 Airflow 工作站可用的記憶體量。
使用 KubernetesPodOperator 或 GKEStartPodOperator 來管理 Cloud Composer 中耗用大量資源的工作,以便進行工作隔離和自訂資源分配。
調整工作,減少記憶體用量。
工作因 DAG 剖析錯誤而失敗,但未產生記錄
有時可能會發生細微的 DAG 錯誤,導致 Airflow 排程器可以排定執行任務,DAG 處理器可以剖析 DAG 檔案,但 Airflow 工作站無法執行 DAG 中的任務,因為 DAG 檔案中存在程式設計錯誤。這可能會導致 Airflow 工作標示為 Failed
,且執行時沒有任何記錄。
解決方法:
在 Airflow 工作站記錄檔中確認,Airflow 工作站未因缺少 DAG 或 DAG 剖析錯誤而發生錯誤。
增加 DAG 剖析相關參數:
將 dagbag-import-timeout 增加至至少 120 秒 (或更多,視需要而定)。
將 dag-file-processor-timeout 調高至至少 180 秒 (或更長時間,視需要而定)。這個值必須高於
dagbag-import-timeout
。
另請參閱「檢查 DAG 處理器記錄」。
工作因資源壓力而失敗,但未產生記錄
症狀:在執行工作時,負責 Airflow 工作執行的 Airflow 工作站子程序會突然中斷。Airflow worker 記錄中顯示的錯誤可能類似以下內容:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
解決方法:
在 Cloud Composer 2 中,為 Airflow 工作站提高記憶體限制。
如果您的環境也產生殭屍工作,請參閱「排解殭屍工作問題」。
如需記憶體不足問題的偵錯教學課程,請參閱「偵錯記憶體不足和 DAG 儲存空間不足的問題」。
工作因 Pod 中斷而失敗,但未產生記錄
Google Kubernetes Engine Pod 會遵循 Kubernetes Pod 生命週期和 Pod 淘汰機制。工作尖峰是 Cloud Composer 中 Pod 遭到淘汰的最常見原因。
如果特定 Pod 過度使用節點的資源,相對於節點的資源使用量預期值,就可能會發生 Pod 剔除情形。舉例來說,如果 Pod 中執行了多項記憶體密集的工作,且這些工作產生的總負載導致 Pod 執行的節點超出記憶體用量限制,就可能會發生淘汰。
如果 Airflow 工作站 Pod 遭到逐出,則該 Pod 上執行的所有工作執行個體都會中斷,並且稍後由 Airflow 標示為失敗。
記錄會經過緩衝處理。如果工作站 Pod 在緩衝區清除前遭到移除,就不會產生記錄。如果工作失敗但未產生記錄,表示 Airflow 工作站因記憶體不足 (OOM) 而重新啟動。即使未產生 Airflow 記錄,Cloud Logging 中仍可能會出現部分記錄。
如要查看記錄:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。
前往「Logs」分頁。
依序前往「All logs」>「Airflow logs」>「Workers」,查看個別 Airflow worker 的記錄檔。
症狀:
前往 Google Cloud 控制台的「Workloads」(工作負載)頁面。
如果有顯示
Evicted
的airflow-worker
Pod,請按一下每個已撤銷的 Pod,然後查看視窗頂端是否有顯示The node was low on resource: memory
訊息。
解決方法:
請檢查
airflow-worker
pod 的記錄,找出可能的淘汰原因。如要進一步瞭解如何從個別 Pod 擷取記錄,請參閱「排解已部署工作負載的問題」。確認 DAG 中的工作皆為冪等且可重試。
避免將不必要的檔案下載至 Airflow 工作站的本機檔案系統。
Airflow 工作者有有限的本機檔案系統容量。Airflow 工作站可使用 1 GB 到 10 GB 的儲存空間。當儲存空間用盡時,Airflow 工作站 Pod 會遭到 GKE 控制層驅逐。這會導致被逐出的 worker 執行的所有工作失敗。
以下列舉一些可能會造成問題的操作:
- 下載檔案或物件,並將這些檔案或物件儲存在本機的 Airflow worker 中。請改為直接將這些物件儲存在適當的服務中,例如 Cloud Storage 值區。
- 透過 Airflow 工作站存取
/data
資料夾中的大型物件。Airflow 工作站會將物件下載至本機檔案系統。請改為實作 DAG,以便在 Airflow 工作者 Pod 外處理大型檔案。
DAG 負載匯入逾時
症狀:
- 在 Airflow 網頁介面中,DAG 清單頁面頂端有一個紅色警示方塊顯示
Broken DAG: [/path/to/dagfile] Timeout
。 在 Cloud Monitoring 中:
airflow-scheduler
記錄檔包含類似以下的項目:ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
修正方式:
覆寫 dag_file_processor_timeout
Airflow 設定選項,並給予系統更多時間進行 DAG 剖析:
區段 | 鍵 | 值 |
---|---|---|
core |
dag_file_processor_timeout |
新逾時值 |
DAG 執行作業未在預期時間內結束
症狀:
有時,由於 Airflow 工作卡住,DAG 執行作業的時間可能會超出預期,導致 DAG 執行作業無法結束。在正常情況下,Airflow 任務不會無限期處於排隊或執行狀態,因為 Airflow 有超時和清理程序,有助避免這種情況發生。
修正方式:
請為 DAG 使用
dagrun_timeout
參數。例如:dagrun_timeout=timedelta(minutes=120)
。因此,每個 DAG 執行作業都必須在 DAG 執行逾時期限內完成。未完成的工作會標示為Failed
或Upstream Failed
。如要進一步瞭解 Airflow 工作狀態,請參閱 Apache Airflow 說明文件。使用工作執行逾時參數,為根據 Apache Airflow 運算子執行的工作定義預設逾時時間。
未執行 DAG 執行作業
症狀:
動態設定 DAG 的排程日期時,可能會導致各種意外的副作用。例如:
DAG 執行作業一律在未來,且 DAG 永遠不會執行。
過去的 DAG 執行作業會標示為已執行且成功,但實際上並未執行。
詳情請參閱 Apache Airflow 說明文件。
可能的解決方案:
請按照 Apache Airflow 說明文件中的建議操作。
為 DAG 設定靜態
start_date
。您可以使用catchup=False
來停用過去日期的 DAG 執行作業。除非您瞭解這種做法可能產生的副作用,否則請避免使用
datetime.now()
或days_ago(<number of days>)
。
進出 Airflow 資料庫的網路流量增加
環境 GKE 叢集和 Airflow 資料庫之間的流量網路數量,取決於 DAG 數量、DAG 中的任務數量,以及 DAG 存取 Airflow 資料庫中資料的方式。以下因素可能會影響網路用量:
對 Airflow 資料庫執行查詢。如果 DAG 執行大量查詢,就會產生大量流量。例如:在繼續執行其他工作前檢查工作狀態、查詢 XCom 資料表、傾印 Airflow 資料庫內容。
工作數量龐大。要排程的工作越多,產生的網路流量就越多。這項考量事項適用於 DAG 中的總工作數量和排程頻率。Airflow 排程器排定 DAG 執行作業時,會對 Airflow 資料庫進行查詢並產生流量。
Airflow 網頁介面會產生網路流量,因為它會查詢 Airflow 資料庫。大量使用含有圖表、工作和圖表的頁面,可能會產生大量網路流量。
DAG 導致 Airflow 網路伺服器當機或傳回「502 gateway timeout」錯誤
網路伺服器錯誤有幾個可能成因。在 Cloud Logging 中查看 airflow-webserver 記錄,判斷 502 gateway timeout
錯誤的原因。
在 DAG 和外掛程式資料夾中處理大量 DAG 和外掛程式
/dags
和 /plugins
資料夾的內容會從環境的值區同步至 Airflow 工作站和排程器的本機檔案系統。
這些資料夾中儲存的資料越多,同步處理作業所需的時間就越長。如要解決這類情況,請按照下列步驟操作:
限制
/dags
和/plugins
資料夾中的檔案數量。只儲存必要的檔案。增加 Airflow 排程器和工作站可用的磁碟空間。
增加 Airflow 排程器和工作站的 CPU 和記憶體,以便加快同步作業的執行速度。
如果 DAG 數量龐大,請將 DAG 分成批次,壓縮成 ZIP 封存檔,然後將這些封存檔部署至
/dags
資料夾。這個方法可加快 DAG 同步處理程序。Airflow 元件會在處理 DAG 前解壓縮 ZIP 封存檔。以程式輔助方式產生 DAG 也是限制
/dags
資料夾中 DAG 檔案數量的一種方法。請參閱「程式輔助 DAG」一節,避免以程式輔助方式產生 DAG 時發生排程和執行問題。
請勿同時安排以程式輔助方式產生的 DAG
透過程式輔助方式從 DAG 檔案產生 DAG 物件,是編寫許多相似 DAG (僅有細微差異) 的有效方法。
請務必不要將所有這類 DAG 排程為立即執行。Airflow 工作站很可能沒有足夠的 CPU 和記憶體資源,無法執行所有同時排定的任務。
如要避免排定程式輔助 DAG 時發生問題,請按照下列步驟操作:
- 增加 worker 並行作業數量,並擴充環境,以便同時執行更多工作。
- 產生 DAG 時,請以均勻分配時間的方式安排 DAG 的排程,避免同時排定數百個任務,讓 Airflow 工作站有時間執行所有排定的任務。
存取 Airflow 網路伺服器時發生錯誤 504
請參閱「存取 Airflow UI 時發生錯誤 504」一文。
在任務執行期間或執行後,系統會在查詢例外狀況發生時,中斷與 Postgres 伺服器的連線
Lost connection to Postgres server during query
例外狀況通常會在符合下列條件時發生:
- DAG 使用
PythonOperator
或自訂運算子。 - DAG 會查詢 Airflow 資料庫。
如果從可呼叫的函式發出多個查詢,則追蹤記錄可能會錯誤地指向 Airflow 程式碼中的 self.refresh_from_db(lock_for_update=True)
行,這是任務執行後的第一個資料庫查詢。例外狀況的實際原因發生在這個步驟之前,也就是 SQLAlchemy 工作階段未正確關閉時。
SQLAlchemy 工作階段的範圍為執行緒,並在可呼叫的函式工作階段中建立,之後可在 Airflow 程式碼中繼續執行。如果在單一工作階段內的查詢之間有明顯延遲,連線可能已由 Postgres 伺服器關閉。Cloud Composer 環境中的連線逾時時間約為 10 分鐘。
解決方法:
- 使用
airflow.utils.db.provide_session
修飾符。這個修飾符會在session
參數中,為 Airflow 資料庫提供有效的工作階段,並在函式結束時正確關閉工作階段。 - 請勿使用單一長時間執行的函式。請改為將所有資料庫查詢移至個別函式,以便有多個含有
airflow.utils.db.provide_session
修飾符的函式。在這種情況下,系統會在擷取查詢結果後自動關閉工作階段。
控制 DAG、工作和相同 DAG 的並行執行時間
如果您想控制單一 DAG 執行作業針對特定 DAG 的執行時間,可以使用 dagrun_timeout
DAG 參數來執行這項操作。舉例來說,如果您希望單一 DAG 執行作業 (無論執行作業是否成功或失敗) 的執行時間不得超過 1 小時,請將這個參數設為 3600 秒。
您也可以控管單一 Airflow 工作可執行的時間長度。方法是使用 execution_timeout
。
如果您想控管特定 DAG 的有效 DAG 執行次數,可以使用 [core]max-active-runs-per-dag
Airflow 設定選項。
如果您希望在特定時間點只執行單一 DAG 例項,請將 max-active-runs-per-dag
參數設為 1
。
影響 DAG 和外掛程式同步處理至排程器、工作站和網路伺服器的問題
Cloud Composer 會將 /dags
和 /plugins
資料夾的內容同步處理至排程器和工作站。/dags
和 /plugins
資料夾中的某些物件可能會導致這項同步處理功能無法正常運作,或導致同步處理速度變慢。
/dags
資料夾會同步至排程器和工作站。這個資料夾不會同步至網頁伺服器。
/plugins
資料夾會同步處理至排程器、工作站和網路伺服器。
您可能會遇到下列問題:
您已將使用壓縮轉碼的 gzip 壓縮檔上傳至
/dags
和/plugins
資料夾。通常會發生在您在gcloud storage cp
指令中使用--gzip-local-all
旗標,將資料上傳至儲存體時。解決方法:刪除使用壓縮轉碼功能的物件,然後重新上傳至值區。
其中一個物件名為「.」:這類物件不會與排程器和工作者同步,且可能會停止同步。
解決方法:重新命名物件。
資料夾和 DAG Python 檔案的名稱相同,例如
a.py
。在這種情況下,DAG 檔案未正確同步至 Airflow 元件。解決方法:移除與 DAG Python 檔案同名的資料夾。
/dags
或/plugins
資料夾中的其中一個物件在物件名稱結尾含有/
符號。由於/
符號代表物件是資料夾,而非檔案,因此這類物件可能會干擾同步處理程序。解決方法:從有問題物件的名稱中移除
/
符號。請勿將不必要的檔案儲存在
/dags
和/plugins
資料夾中。有時,您導入的 DAG 和外掛程式會附帶其他檔案,例如用於儲存這些元件測試的檔案。這些檔案會與工作站和排程器同步,並影響將這些檔案複製到排程器、工作站和網路伺服器所需的時間。
解決方法:請勿在
/dags
和/plugins
資料夾中儲存任何額外和不必要的檔案。
Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' error is generated by schedulers and workers
發生這個問題的原因是,物件在 Cloud Storage 中的命名空間可能會重疊,而調度器和 worker 同時使用傳統檔案系統。舉例來說,您可以將同名的資料夾和物件新增至環境的儲存體。當桶區同步至環境的排程器和工作站時,系統會產生此錯誤,導致工作失敗。
如要修正這個問題,請確認環境值區中沒有重疊的命名空間。舉例來說,如果 /dags/misc
(檔案) 和 /dags/misc/example_file.txt
(另一個檔案) 都位於值區中,排程器就會產生錯誤。
連線至 Airflow 中繼資料資料庫時,發生暫時中斷
Cloud Composer 會在分散式基礎架構上執行。這表示偶爾可能會發生一些暫時性問題,而這些問題可能會中斷 Airflow 工作執行作業。
在這種情況下,您可能會在 Airflow worker 的記錄中看到下列錯誤訊息:
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
或
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
這類間歇性問題也可能由 Cloud Composer 環境執行的維護作業所造成。
這類錯誤通常是間歇性的,如果 Airflow 工作是冪等的,且您已設定重試,這些錯誤就不會影響您。您也可以考慮定義維護期間。
這類錯誤的另一個原因,可能是環境叢集中缺少資源。在這種情況下,您可以依照「擴充環境」或「最佳化環境」的操作說明,擴充或最佳化環境。
DAG 執行作業標示為成功,但沒有執行的工作
如果 DAG 執行作業 execution_date
早於 DAG 的 start_date
,您可能會看到 DAG 執行作業沒有任何任務執行作業,但仍標示為成功。

原因
這種情況可能發生在下列任一情況:
不相符的原因是 DAG 的
execution_date
與start_date
之間的時區差異。例如,當您使用pendulum.parse(...)
設定start_date
時,就可能發生這種情況。DAG 的
start_date
已設為動態值,例如airflow.utils.dates.days_ago(1)
解決方案
請確認
execution_date
和start_date
使用相同的時區。指定靜態
start_date
並與catchup=False
合併,避免執行具有過去開始日期的 DAG。
Airflow UI 或 DAG UI 中未顯示 DAG,且排程器未排定 DAG
DAG 處理器會剖析各個 DAG,接著排程器才能為 DAG 排程,並在 DAG 顯示在 Airflow UI 或 DAG UI 之前。
下列 Airflow 設定選項會定義 DAG 剖析的逾時值:
[core]dagrun_import_timeout
定義 DAG 處理器剖析單一 DAG 所需的時間。[core]dag_file_processor_timeout
定義 DAG 處理器可用於剖析所有 DAG 的總時間長度。
如果無法在 Airflow UI 或 DAG UI 中查看 DAG:
檢查 DAG 處理器記錄,確認 DAG 處理器是否能正確處理 DAG。如果發生問題,您可能會在 DAG 處理器或排程器記錄中看到下列記錄項目:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for /usr/local/airflow/dags/example_dag.py with PID 21903 started at 2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
檢查排程器記錄,確認排程器是否正常運作。發生問題時,您可能會在排程器記錄中看到下列記錄項目:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it Process timed out, PID: 68496
解決方法:
修正所有 DAG 剖析錯誤。DAG 處理器會剖析多個 DAG,在少數情況下,一個 DAG 的剖析錯誤可能會對其他 DAG 的剖析作業造成負面影響。
如果 DAG 的剖析時間超過
[core]dagrun_import_timeout
中定義的秒數,請增加這個逾時時間。如果剖析所有 DAG 的時間超過
[core]dag_file_processor_timeout
中定義的秒數,請增加這個逾時時間。如果 DAG 需要花費很長的時間進行剖析,也可能表示並未以最佳方式實作。例如,如果讀取許多環境變數,或執行對外部服務或 Airflow 資料庫的呼叫。請盡可能避免在 DAG 的全球性區段中執行這類作業。
增加排程器的 CPU 和記憶體資源,讓排程器能更快速地運作。
增加 DAG 處理器程序的數量,以便加快剖析速度。您可以透過提高
[scheduler]parsing_process
的值來完成此操作。
Airflow 資料庫負載過重的症狀
詳情請參閱「Airflow 資料庫負載壓力下的症狀」。