Agendar e acionar DAGs de fluxo de ar

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página explica como o agendamento e o acionamento de DAG funcionam no Airflow, como definir um agendamento para um DAG e como acionar um DAG manualmente ou pausá-lo.

Sobre DAGs de fluxo de ar no Cloud Composer

Os DAGs do Airflow no Cloud Composer são executados em um ou mais ambientes do Cloud Composer no seu projeto. Você carrega os arquivos de origem dos seus DAGs do Airflow para um bucket do Cloud Storage associado a um ambiente. A instância do Airflow no ambiente analisa esses arquivos e agenda as execuções dos DAGs, conforme definido pela agenda de cada DAG. Durante uma execução de DAG, o Airflow agenda e executa tarefas individuais que compõem um DAG em uma sequência definida pelo DAG.

Para saber mais sobre os principais conceitos do Airflow, como DAGs do Airflow, execuções de DAG, tarefas ou operadores, consulte a página Conceitos principais na documentação do Airflow.

Sobre o agendamento de DAG no Airflow

O Airflow fornece os seguintes conceitos para seu mecanismo de agendamento:

Data lógica

Representa uma data para a qual uma execução DAG específica é executada .

Esta não é a data real em que o Airflow executa um DAG, mas um período de tempo que uma execução específica de DAG deve processar. Por exemplo, para um DAG programado para ser executado todos os dias às 12h, a data lógica também seria 12h de um dia específico. Como ele é executado duas vezes por dia, o período de tempo que ele deve processar são as últimas 12 horas. Ao mesmo tempo, a lógica definida no próprio DAG pode não usar a data lógica ou o intervalo de tempo. Por exemplo, um DAG pode executar o mesmo script uma vez por dia sem usar o valor da data lógica.

Nas versões do Airflow anteriores à 2.2, essa data é chamada de data de execução .

Data de execução

Representa uma data em que uma execução DAG específica é executada .

Por exemplo, para um DAG programado para ser executado todos os dias às 12:00, a execução real do DAG pode ocorrer às 12:05, algum tempo depois da data lógica.

Intervalo de programação

Representa quando e com que frequência um DAG deve ser executado, em termos de datas lógicas.

Por exemplo, uma programação diária significa que um DAG é executado uma vez por dia, e as datas lógicas para suas execuções de DAG têm intervalos de 24 horas.

Data de início

Especifica quando você deseja que o Airflow comece a agendar seu DAG.

As tarefas no seu DAG podem ter datas de início individuais ou você pode especificar uma única data de início para todas as tarefas. Com base na data mínima de início para as tarefas no seu DAG e no intervalo de agendamento, o Airflow agenda as execuções do DAG.

Recuperação, preenchimento e novas tentativas

Mecanismos para executar execuções de DAG para datas passadas.

O Catchup executa execuções de DAG que ainda não foram executadas, por exemplo, se o DAG foi pausado por um longo período e depois reativado. Você pode usar o preenchimento automático para executar execuções de DAG para um determinado intervalo de datas. As tentativas especificam quantas tentativas o Airflow deve fazer ao executar tarefas de um DAG.

O agendamento funciona da seguinte maneira:

  1. Após a data de início, o Airflow aguarda a próxima ocorrência do intervalo de programação.

  2. O Airflow programa a primeira execução do DAG para acontecer no final deste intervalo de programação.

    Por exemplo, se um DAG estiver programado para ser executado a cada hora e a data de início for às 12:00 de hoje, a primeira execução do DAG ocorrerá às 13:00 de hoje.

A seção "Agendar um DAG do Airflow" neste documento descreve como configurar o agendamento para seus DAGs usando esses conceitos. Para obter mais informações sobre execuções e agendamento de DAGs, consulte "Execuções de DAGs" na documentação do Airflow.

Sobre maneiras de acionar um DAG

O Airflow oferece as seguintes maneiras de acionar um DAG:

  • Acionar de acordo com uma programação . O Airflow aciona o DAG automaticamente com base na programação especificada para ele no arquivo DAG.

  • Acionar manualmente . Você pode acionar um DAG manualmente a partir deGoogle Cloud console, interface do usuário do Airflow ou executando um comando CLI do Airflow no Google Cloud CLI.

  • Acionar em resposta a eventos. A maneira padrão de acionar um DAG em resposta a eventos é usar um sensor .

Outras maneiras de acionar DAGs:

Antes de começar

  • Certifique-se de que sua conta tenha uma função que possa gerenciar objetos nos buckets de ambiente e visualizar e acionar DAGs. Para obter mais informações, consulte Controle de acesso .

Agende um DAG de fluxo de ar

Defina um agendamento para um DAG no arquivo DAG. Edite a definição do DAG da seguinte maneira:

  1. Localize e edite o arquivo DAG no seu computador. Se você não tiver o arquivo DAG, poderá baixar uma cópia dele no bucket do ambiente . Para um novo DAG, você pode definir todos os parâmetros ao criá-lo.

  2. No parâmetro schedule_interval , defina o agendamento. Você pode usar uma expressão Cron, como 0 0 * * * , ou uma predefinição, como @daily . Para mais informações, consulte Cron e Intervalos de Tempo na documentação do Airflow.

    O Airflow determina datas lógicas para execuções de DAG com base na programação definida por você.

  3. No parâmetro start_date , defina a data de início.

    O Airflow determina a data lógica da primeira execução do DAG usando este parâmetro.

  4. (Opcional) No parâmetro catchup , defina se o Airflow deve executar todas as execuções anteriores deste DAG, da data de início até a data atual, que ainda não foram executadas.

    As execuções de DAG executadas durante a recuperação terão sua data lógica no passado e sua data de execução refletirá o momento em que a execução de DAG foi realmente executada.

  5. (Opcional) No parâmetro retries , defina quantas vezes o Airflow deve tentar novamente as tarefas que falharam (cada DAG consiste em uma ou mais tarefas individuais). Por padrão, as tarefas no Cloud Composer são tentadas novamente duas vezes.

  6. Carregue a nova versão do DAG para o bucket do ambiente.

  7. Aguarde até que o Airflow analise o DAG com sucesso. Por exemplo, você pode verificar a lista de DAGs em seu ambiente noGoogle Cloud console ou na interface do usuário do Airflow .

O exemplo de definição de DAG a seguir é executado duas vezes por dia, às 00:00 e às 12:00. Sua data de início é definida como 1º de janeiro de 2024, mas o Airflow não o executa para datas passadas após o upload ou a pausa porque a recuperação está desabilitada.

O DAG contém uma tarefa chamada insert_query_job , que insere uma linha em uma tabela com o operador BigQueryInsertJobOperator . Este operador é um dos Google Cloud Operadores do BigQuery , que você pode usar para gerenciar conjuntos de dados e tabelas, executar consultas e validar dados. Se uma execução específica dessa tarefa falhar, o Airflow a tentará novamente mais quatro vezes com o intervalo de repetição padrão. A data lógica para essas tentativas permanece a mesma.

A consulta SQL para esta linha usa modelos do Airflow para gravar a data lógica e o nome do DAG na linha.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

Para testar este DAG, você pode acioná-lo manualmente e depois visualizar os logs de execução da tarefa .

Mais exemplos de parâmetros de agendamento

Os exemplos de parâmetros de agendamento a seguir ilustram como o agendamento funciona com diferentes combinações de parâmetros:

  • Se start_date for datetime(2024, 4, 4, 16, 25) e schedule_interval for 30 16 * * * , a primeira execução do DAG ocorrerá às 16:30 do dia 5 de abril de 2024.

  • Se start_date for datetime(2024, 4, 4, 16, 35) e schedule_interval for 30 16 * * * , a primeira execução do DAG ocorrerá às 16h30 do dia 6 de abril de 2024. Como a data de início é posterior ao intervalo de agendamento em 4 de abril de 2024, a execução do DAG não ocorrerá em 5 de abril de 2024. Em vez disso, o intervalo de agendamento termina às 16h35 do dia 5 de abril de 2024, portanto, a próxima execução do DAG será agendada para as 16h30 do dia seguinte.

  • Se start_date for datetime(2024, 4, 4) e schedule_interval for @daily , a primeira execução do DAG será agendada para 00:00 em 5 de abril de 2024.

  • Se start_date for datetime(2024, 4, 4, 16, 30) e o schedule_interval for 0 * * * * , a primeira execução do DAG será agendada para as 18h do dia 4 de abril de 2024. Após a data e a hora especificadas, o Airflow agendará uma execução do DAG para o minuto 0 de cada hora. O momento mais próximo em que isso acontece é às 17h. Nesse momento, o Airflow agendará uma execução do DAG para o final do intervalo de agendamento, ou seja, às 18h.

Acionar um DAG manualmente

Quando você aciona manualmente um DAG do Airflow, o Airflow executa o DAG uma vez, independentemente da programação especificada no arquivo DAG.

Console

Para acionar um DAG de Google Cloud console:

  1. No Google Cloud console, vá para a página Ambientes .

    Ir para Ambientes

  2. Selecione um ambiente para visualizar seus detalhes.

  3. Na página Detalhes do ambiente , acesse a aba DAGs .

  4. Clique no nome de um DAG.

  5. Na página de detalhes do DAG , clique em Acionar DAG . Uma nova execução de DAG será criada.

Interface de usuário do fluxo de ar

Para acionar um DAG na interface do usuário do Airflow:

  1. No Google Cloud console, vá para a página Ambientes .

    Ir para Ambientes

  2. Na coluna do servidor web Airflow , siga o link Airflow para seu ambiente.

  3. Faça login com a Conta do Google que tenha as permissões apropriadas.

  4. Na interface da web do Airflow, na página DAGs , na coluna Links do seu DAG, clique no botão Acionar Dag .

  5. (Opcional) Especifique a configuração de execução do DAG.

  6. Clique em Gatilho .

gcloud

Execute o comando dags trigger Airflow CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Substitua o seguinte:

  • ENVIRONMENT_NAME : o nome do seu ambiente.
  • LOCATION : região onde o ambiente está localizado.
  • DAG_ID : o nome do DAG.

Para obter mais informações sobre como executar comandos CLI do Airflow em ambientes do Cloud Composer, consulte Executando comandos CLI do Airflow .

Para obter mais informações sobre os comandos CLI do Airflow disponíveis, consulte a referência de comando gcloud composer environments run .

Ver logs e detalhes de execução do DAG

No Google Cloud console, você pode:

Além disso, o Cloud Composer fornece acesso à interface do usuário do Airflow , que é a interface web do Airflow.

Pausar um DAG

Console

Para pausar um DAG de Google Cloud console:

  1. No Google Cloud console, vá para a página Ambientes .

    Ir para Ambientes

  2. Selecione um ambiente para visualizar seus detalhes.

  3. Na página Detalhes do ambiente , acesse a aba DAGs .

  4. Clique no nome de um DAG.

  5. Na página de detalhes do DAG , clique em Pausar DAG .

Interface de usuário do fluxo de ar

Para pausar um DAG na interface do usuário do Airflow:

  1. No Google Cloud console, vá para a página Ambientes .

Ir para Ambientes

  1. Na coluna do servidor web Airflow , siga o link Airflow para seu ambiente.

  2. Faça login com a Conta do Google que tenha as permissões apropriadas.

  3. Na interface da web do Airflow, na página DAGs , clique no botão de alternância ao lado do nome do DAG.

gcloud

Execute o comando dags pause Airflow CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

Substitua o seguinte:

  • ENVIRONMENT_NAME : o nome do seu ambiente.
  • LOCATION : região onde o ambiente está localizado.
  • DAG_ID : o nome do DAG.

Para obter mais informações sobre como executar comandos CLI do Airflow em ambientes do Cloud Composer, consulte Executando comandos CLI do Airflow .

Para obter mais informações sobre os comandos CLI do Airflow disponíveis, consulte a referência de comando gcloud composer environments run .

O que vem a seguir