Desenvolvimento Web

Apache Airflow com Python: Como Orquestrar Pipelines de Dados na Pratica

Apache Airflow com Python: Como Orquestrar Pipelines de Dados na Pratica

Apache Airflow e o orquestrador de pipelines de dados mais popular do mercado. Criado pelo Airbnb e agora projeto Apache, e usado por empresas como Spotify, Twitter, Nubank e iFood para automatizar fluxos de ETL, machine learning e integracao de dados. Este guia mostra como comecar com Airflow usando Python.

O que e orquestracao de dados

Orquestracao e o processo de coordenar a execucao de tarefas interdependentes em uma sequencia definida. Em pipelines de dados: extrair dados de uma API, transformar com Python/Pandas, carregar no banco de dados, rodar um modelo de ML, enviar relatorio por e-mail. Cada etapa depende da anterior. O orquestrador garante a execucao na ordem correta, trata falhas, faz retentativas e notifica em caso de problemas.

Antes do Airflow, isso era feito com cron jobs e scripts bash — dificil de monitorar, sem tratamento de falhas e impossível de escalar.

Conceitos fundamentais do Airflow

DAG (Directed Acyclic Graph): e o pipeline definido em Python. Cada DAG tem um conjunto de tasks (tarefas) com dependencias entre elas. A execucao respeita a ordem topologica do grafo.

Task: uma unidade de trabalho dentro de uma DAG. Pode ser um script Python, um comando bash, uma query SQL, uma chamada de API.

Operator: define o tipo de task. BashOperator executa comandos shell, PythonOperator executa funcoes Python, SqliteOperator executa queries SQL, HttpOperator faz chamadas HTTP.

Schedule: define quando a DAG roda. Pode ser intervalos fixos (@daily, @hourly), expressoes cron (0 6 * * 1-5 para dias uteis as 6h), ou trigger manual.

Instalacao e setup

A forma mais simples de instalar o Airflow localmente: pip install apache-airflow. Depois: airflow db init (inicializa o banco de metadados), airflow users create (cria usuario admin), airflow webserver –port 8080 (inicia a interface web), airflow scheduler (inicia o scheduler em outro terminal). A interface web em localhost:8080 mostra todas as DAGs, historico de execucao e logs.

Para producao, o Airflow roda em Docker ou Kubernetes. A imagem oficial apache/airflow no Docker Hub vem pronta com docker-compose para subir webserver, scheduler, worker e banco PostgreSQL.

Criando sua primeira DAG

Uma DAG basica que extrai dados de uma API e salva num CSV:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd

default_args = {
“owner”: “data-team”,
“retries”: 3,
“retry_delay”: timedelta(minutes=5),
“email_on_failure”: True,
“email”: [“alerta@empresa.com”],
}

def extrair_dados(**kwargs):
response = requests.get(“https://api.exemplo.com/vendas”, timeout=30)
dados = response.json()
df = pd.DataFrame(dados)
df.to_csv(“/tmp/vendas_brutas.csv”, index=False)
return len(dados)

def transformar_dados(**kwargs):
df = pd.read_csv(“/tmp/vendas_brutas.csv”)
df[“data”] = pd.to_datetime(df[“data”])
df[“mes”] = df[“data”].dt.month
df[“receita”] = df[“quantidade”] * df[“preco_unitario”]
df.to_csv(“/tmp/vendas_transformadas.csv”, index=False)

def carregar_dados(**kwargs):
df = pd.read_csv(“/tmp/vendas_transformadas.csv”)
# Inserir no banco de dados
from sqlalchemy import create_engine
engine = create_engine(“postgresql://user:pass@host/db”)
df.to_sql(“vendas_processadas”, engine, if_exists=”append”, index=False)

with DAG(
dag_id=”pipeline_vendas”,
default_args=default_args,
description=”Pipeline ETL de vendas diario”,
schedule_interval=”@daily”,
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
t1 = PythonOperator(task_id=”extrair”, python_callable=extrair_dados)
t2 = PythonOperator(task_id=”transformar”, python_callable=transformar_dados)
t3 = PythonOperator(task_id=”carregar”, python_callable=carregar_dados)
t1 >> t2 >> t3

Boas praticas com Airflow

Idempotencia: cada task deve produzir o mesmo resultado se executada multiplas vezes. Isso permite re-execucao segura em caso de falhas.

XCom para comunicacao entre tasks: use kwargs[“ti”].xcom_push e xcom_pull para passar dados pequenos entre tasks. Para dados grandes, salve em storage (S3, GCS) e passe apenas o path.

Sensores: SensorOperators esperam uma condicao antes de prosseguir (ex: S3KeySensor espera um arquivo aparecer no S3 antes de iniciar o processamento).

Paralelismo: tasks sem dependencia entre si podem rodar em paralelo. Configure o pool e o nivel de paralelismo para controlar a carga.

Airflow na nuvem

Para producao sem gerenciar infraestrutura: Google Cloud Composer (Airflow gerenciado no GCP), Amazon MWAA (Managed Workflows for Apache Airflow na AWS) e Astronomer (plataforma especializada em Airflow gerenciado). Todas oferecem Airflow pre-configurado sem preocupacao com escalabilidade, banco de metadados ou atualizacoes de versao.

Tem um projeto em mente?

Somos especialistas em transformar ideias em produtos digitais. Apps, sites, automações e IA — vamos construir juntos.

Resposta rápida Orçamento sem compromisso +100 projetos entregues
Compartilhar: