Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página explica como o agendamento e o acionamento de DAG funcionam no Airflow, como definir um agendamento para um DAG e como acionar um DAG manualmente ou pausá-lo.
Sobre DAGs de fluxo de ar no Cloud Composer
Os DAGs do Airflow no Cloud Composer são executados em um ou mais ambientes do Cloud Composer no seu projeto. Você carrega os arquivos de origem dos seus DAGs do Airflow para um bucket do Cloud Storage associado a um ambiente. A instância do Airflow no ambiente analisa esses arquivos e agenda as execuções dos DAGs, conforme definido pela agenda de cada DAG. Durante uma execução de DAG, o Airflow agenda e executa tarefas individuais que compõem um DAG em uma sequência definida pelo DAG.
Para saber mais sobre os principais conceitos do Airflow, como DAGs do Airflow, execuções de DAG, tarefas ou operadores, consulte a página Conceitos principais na documentação do Airflow.
Sobre o agendamento de DAG no Airflow
O Airflow fornece os seguintes conceitos para seu mecanismo de agendamento:
- Data lógica
Representa uma data para a qual uma execução DAG específica é executada .
Esta não é a data real em que o Airflow executa um DAG, mas um período de tempo que uma execução específica de DAG deve processar. Por exemplo, para um DAG programado para ser executado todos os dias às 12h, a data lógica também seria 12h de um dia específico. Como ele é executado duas vezes por dia, o período de tempo que ele deve processar são as últimas 12 horas. Ao mesmo tempo, a lógica definida no próprio DAG pode não usar a data lógica ou o intervalo de tempo. Por exemplo, um DAG pode executar o mesmo script uma vez por dia sem usar o valor da data lógica.
Nas versões do Airflow anteriores à 2.2, essa data é chamada de data de execução .
- Data de execução
Representa uma data em que uma execução DAG específica é executada .
Por exemplo, para um DAG programado para ser executado todos os dias às 12:00, a execução real do DAG pode ocorrer às 12:05, algum tempo depois da data lógica.
- Intervalo de programação
Representa quando e com que frequência um DAG deve ser executado, em termos de datas lógicas.
Por exemplo, uma programação diária significa que um DAG é executado uma vez por dia, e as datas lógicas para suas execuções de DAG têm intervalos de 24 horas.
- Data de início
Especifica quando você deseja que o Airflow comece a agendar seu DAG.
As tarefas no seu DAG podem ter datas de início individuais ou você pode especificar uma única data de início para todas as tarefas. Com base na data mínima de início para as tarefas no seu DAG e no intervalo de agendamento, o Airflow agenda as execuções do DAG.
- Recuperação, preenchimento e novas tentativas
Mecanismos para executar execuções de DAG para datas passadas.
O Catchup executa execuções de DAG que ainda não foram executadas, por exemplo, se o DAG foi pausado por um longo período e depois reativado. Você pode usar o preenchimento automático para executar execuções de DAG para um determinado intervalo de datas. As tentativas especificam quantas tentativas o Airflow deve fazer ao executar tarefas de um DAG.
O agendamento funciona da seguinte maneira:
Após a data de início, o Airflow aguarda a próxima ocorrência do intervalo de programação.
O Airflow programa a primeira execução do DAG para acontecer no final deste intervalo de programação.
Por exemplo, se um DAG estiver programado para ser executado a cada hora e a data de início for às 12:00 de hoje, a primeira execução do DAG ocorrerá às 13:00 de hoje.
A seção "Agendar um DAG do Airflow" neste documento descreve como configurar o agendamento para seus DAGs usando esses conceitos. Para obter mais informações sobre execuções e agendamento de DAGs, consulte "Execuções de DAGs" na documentação do Airflow.
Sobre maneiras de acionar um DAG
O Airflow oferece as seguintes maneiras de acionar um DAG:
Acionar de acordo com uma programação . O Airflow aciona o DAG automaticamente com base na programação especificada para ele no arquivo DAG.
Acionar manualmente . Você pode acionar um DAG manualmente a partir deGoogle Cloud console, interface do usuário do Airflow ou executando um comando CLI do Airflow no Google Cloud CLI.
Acionar em resposta a eventos. A maneira padrão de acionar um DAG em resposta a eventos é usar um sensor .
Outras maneiras de acionar DAGs:
Acione programaticamente. Você pode acionar um DAG usando a API REST do Airflow . Por exemplo, a partir de um script Python.
Acione programaticamente em resposta a eventos. Você pode acionar DAGs em resposta a eventos usando as funções do Cloud Run e a API REST do Airflow .
Antes de começar
- Certifique-se de que sua conta tenha uma função que possa gerenciar objetos nos buckets de ambiente e visualizar e acionar DAGs. Para obter mais informações, consulte Controle de acesso .
Agende um DAG de fluxo de ar
Defina um agendamento para um DAG no arquivo DAG. Edite a definição do DAG da seguinte maneira:
Localize e edite o arquivo DAG no seu computador. Se você não tiver o arquivo DAG, poderá baixar uma cópia dele no bucket do ambiente . Para um novo DAG, você pode definir todos os parâmetros ao criá-lo.
No parâmetro
schedule_interval
, defina o agendamento. Você pode usar uma expressão Cron, como0 0 * * *
, ou uma predefinição, como@daily
. Para mais informações, consulte Cron e Intervalos de Tempo na documentação do Airflow.O Airflow determina datas lógicas para execuções de DAG com base na programação definida por você.
No parâmetro
start_date
, defina a data de início.O Airflow determina a data lógica da primeira execução do DAG usando este parâmetro.
(Opcional) No parâmetro
catchup
, defina se o Airflow deve executar todas as execuções anteriores deste DAG, da data de início até a data atual, que ainda não foram executadas.As execuções de DAG executadas durante a recuperação terão sua data lógica no passado e sua data de execução refletirá o momento em que a execução de DAG foi realmente executada.
(Opcional) No parâmetro
retries
, defina quantas vezes o Airflow deve tentar novamente as tarefas que falharam (cada DAG consiste em uma ou mais tarefas individuais). Por padrão, as tarefas no Cloud Composer são tentadas novamente duas vezes.Carregue a nova versão do DAG para o bucket do ambiente.
Aguarde até que o Airflow analise o DAG com sucesso. Por exemplo, você pode verificar a lista de DAGs em seu ambiente noGoogle Cloud console ou na interface do usuário do Airflow .
O exemplo de definição de DAG a seguir é executado duas vezes por dia, às 00:00 e às 12:00. Sua data de início é definida como 1º de janeiro de 2024, mas o Airflow não o executa para datas passadas após o upload ou a pausa porque a recuperação está desabilitada.
O DAG contém uma tarefa chamada insert_query_job
, que insere uma linha em uma tabela com o operador BigQueryInsertJobOperator
. Este operador é um dos Google Cloud Operadores do BigQuery , que você pode usar para gerenciar conjuntos de dados e tabelas, executar consultas e validar dados. Se uma execução específica dessa tarefa falhar, o Airflow a tentará novamente mais quatro vezes com o intervalo de repetição padrão. A data lógica para essas tentativas permanece a mesma.
A consulta SQL para esta linha usa modelos do Airflow para gravar a data lógica e o nome do DAG na linha.
import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
with DAG(
"bq_example_scheduling_dag",
start_date=datetime.datetime(2024, 1, 1),
schedule_interval='0 */12 * * *',
catchup=False
) as dag:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
retries=4,
configuration={
"query": {
# schema: date (string), description (string)
# example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
"query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
"useLegacySql": False,
"priority": "BATCH",
}
},
location="us-central1"
)
insert_query_job
Para testar este DAG, você pode acioná-lo manualmente e depois visualizar os logs de execução da tarefa .
Mais exemplos de parâmetros de agendamento
Os exemplos de parâmetros de agendamento a seguir ilustram como o agendamento funciona com diferentes combinações de parâmetros:
Se
start_date
fordatetime(2024, 4, 4, 16, 25)
eschedule_interval
for30 16 * * *
, a primeira execução do DAG ocorrerá às 16:30 do dia 5 de abril de 2024.Se
start_date
fordatetime(2024, 4, 4, 16, 35)
eschedule_interval
for30 16 * * *
, a primeira execução do DAG ocorrerá às 16h30 do dia 6 de abril de 2024. Como a data de início é posterior ao intervalo de agendamento em 4 de abril de 2024, a execução do DAG não ocorrerá em 5 de abril de 2024. Em vez disso, o intervalo de agendamento termina às 16h35 do dia 5 de abril de 2024, portanto, a próxima execução do DAG será agendada para as 16h30 do dia seguinte.Se
start_date
fordatetime(2024, 4, 4)
eschedule_interval
for@daily
, a primeira execução do DAG será agendada para 00:00 em 5 de abril de 2024.Se
start_date
fordatetime(2024, 4, 4, 16, 30)
e oschedule_interval
for0 * * * *
, a primeira execução do DAG será agendada para as 18h do dia 4 de abril de 2024. Após a data e a hora especificadas, o Airflow agendará uma execução do DAG para o minuto 0 de cada hora. O momento mais próximo em que isso acontece é às 17h. Nesse momento, o Airflow agendará uma execução do DAG para o final do intervalo de agendamento, ou seja, às 18h.
Acionar um DAG manualmente
Quando você aciona manualmente um DAG do Airflow, o Airflow executa o DAG uma vez, independentemente da programação especificada no arquivo DAG.
Console
Para acionar um DAG de Google Cloud console:
No Google Cloud console, vá para a página Ambientes .
Selecione um ambiente para visualizar seus detalhes.
Na página Detalhes do ambiente , acesse a aba DAGs .
Clique no nome de um DAG.
Na página de detalhes do DAG , clique em Acionar DAG . Uma nova execução de DAG será criada.
Interface de usuário do fluxo de ar
Para acionar um DAG na interface do usuário do Airflow:
No Google Cloud console, vá para a página Ambientes .
Na coluna do servidor web Airflow , siga o link Airflow para seu ambiente.
Faça login com a Conta do Google que tenha as permissões apropriadas.
Na interface da web do Airflow, na página DAGs , na coluna Links do seu DAG, clique no botão Acionar Dag .
(Opcional) Especifique a configuração de execução do DAG.
Clique em Gatilho .
gcloud
Execute o comando dags trigger
Airflow CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags trigger -- DAG_ID
Substitua o seguinte:
-
ENVIRONMENT_NAME
: o nome do seu ambiente. -
LOCATION
: região onde o ambiente está localizado. -
DAG_ID
: o nome do DAG.
Para obter mais informações sobre como executar comandos CLI do Airflow em ambientes do Cloud Composer, consulte Executando comandos CLI do Airflow .
Para obter mais informações sobre os comandos CLI do Airflow disponíveis, consulte a referência de comando gcloud composer environments run
.
Ver logs e detalhes de execução do DAG
No Google Cloud console, você pode:
- Veja status de execuções anteriores do DAG e detalhes do DAG .
- Explore registros detalhados de todas as execuções de DAG e todas as tarefas desses DAGs.
- Ver estatísticas do DAG .
Além disso, o Cloud Composer fornece acesso à interface do usuário do Airflow , que é a interface web do Airflow.
Pausar um DAG
Console
Para pausar um DAG de Google Cloud console:
No Google Cloud console, vá para a página Ambientes .
Selecione um ambiente para visualizar seus detalhes.
Na página Detalhes do ambiente , acesse a aba DAGs .
Clique no nome de um DAG.
Na página de detalhes do DAG , clique em Pausar DAG .
Interface de usuário do fluxo de ar
Para pausar um DAG na interface do usuário do Airflow:
- No Google Cloud console, vá para a página Ambientes .
Na coluna do servidor web Airflow , siga o link Airflow para seu ambiente.
Faça login com a Conta do Google que tenha as permissões apropriadas.
Na interface da web do Airflow, na página DAGs , clique no botão de alternância ao lado do nome do DAG.
gcloud
Execute o comando dags pause
Airflow CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags pause -- DAG_ID
Substitua o seguinte:
-
ENVIRONMENT_NAME
: o nome do seu ambiente. -
LOCATION
: região onde o ambiente está localizado. -
DAG_ID
: o nome do DAG.
Para obter mais informações sobre como executar comandos CLI do Airflow em ambientes do Cloud Composer, consulte Executando comandos CLI do Airflow .
Para obter mais informações sobre os comandos CLI do Airflow disponíveis, consulte a referência de comando gcloud composer environments run
.