Desenvolvimento Web

ETL Moderno com Python: Como Extrair, Transformar e Carregar Dados

ETL Moderno com Python: Como Extrair, Transformar e Carregar Dados

ETL (Extract, Transform, Load) e o processo fundamental de engenharia de dados: extrair dados de diversas fontes, transforma-los em um formato util e carrega-los em um destino para analise. Este guia mostra como construir pipelines ETL robustos com Python em 2026.

ETL vs ELT: qual a diferenca

ETL tradicional: dados sao extraidos, transformados fora do banco de dados (em um servidor ou script) e depois carregados no destino ja transformados. Modelo dominante ate 2015.

ELT moderno: dados sao extraidos e carregados no warehouse em formato bruto, e a transformacao acontece dentro do warehouse usando SQL (com ferramentas como dbt). Modelo dominante em 2026 com warehouses cloud poderosos (BigQuery, Snowflake, Redshift).

Na pratica, a maioria dos pipelines em 2026 usa uma combinacao: Python para E e L (extract e load), e dbt/SQL para T (transform) dentro do warehouse. Mas saber fazer ETL completo em Python continua essencial para fontes nao-estruturadas, APIs complexas e pre-processamento.

Extract: buscando dados de fontes diversas

APIs REST com requests

A maioria das fontes modernas expoe dados via API REST. O padrao:

import requests
import time

def extrair_api_paginada(url_base, headers=None, max_pages=100):
todos_os_dados = []
page = 1

while page <= max_pages:
response = requests.get(f"{url_base}?page={page}&per_page=100", headers=headers, timeout=30)
response.raise_for_status()
dados = response.json()

if not dados:
break

todos_os_dados.extend(dados)
page += 1
time.sleep(0.5) # respeitar rate limits

return todos_os_dados

Bancos de dados com SQLAlchemy

from sqlalchemy import create_engine
import pandas as pd

engine = create_engine(“postgresql://user:pass@host:5432/db”)
df = pd.read_sql(“SELECT * FROM clientes WHERE atualizado_em > %s”, engine, params=[ultima_extracao])

Arquivos CSV, JSON e Parquet

Pandas le praticamente qualquer formato: pd.read_csv, pd.read_json, pd.read_parquet, pd.read_excel. Para arquivos grandes, use chunksize para processar em lotes: for chunk in pd.read_csv(“grande.csv”, chunksize=10000).

Transform: limpeza e enriquecimento

As transformacoes mais comuns em pipelines de dados:

Limpeza de tipos: converter strings em datas, numeros, categorias. Remover valores invalidos e duplicatas:

df[“data_nascimento”] = pd.to_datetime(df[“data_nascimento”], errors=”coerce”)
df[“cpf”] = df[“cpf”].str.replace(r”[^0-9]”, “”, regex=True)
df = df.drop_duplicates(subset=[“id_cliente”])
df = df.dropna(subset=[“email”]) # remove registros sem email

Validacao de dados:

assert df[“valor”].min() >= 0, “Valores negativos encontrados!”
assert df[“email”].str.contains(“@”).all(), “Emails invalidos encontrados!”

Enriquecimento: adicionar colunas derivadas, geocodificar enderecos, classificar com regras de negocio:

df[“faixa_etaria”] = pd.cut(df[“idade”], bins=[0, 18, 30, 45, 60, 120], labels=[“menor”, “jovem”, “adulto”, “maduro”, “senior”])
df[“regiao”] = df[“estado”].map({“SP”: “Sudeste”, “RJ”: “Sudeste”, “MG”: “Sudeste”, “RS”: “Sul”})

Load: carregando no destino

Para bancos de dados SQL, o pandas to_sql e a forma mais simples:

df.to_sql(“tabela_destino”, engine, if_exists=”append”, index=False, method=”multi”, chunksize=1000)

O parametro if_exists controla o comportamento: “fail” (erro se tabela existe), “replace” (recria a tabela), “append” (insere novos registros).

Para data lakes, salve em Parquet (formato colunar comprimido, padrao da industria):

df.to_parquet(“s3://meu-datalake/vendas/2026/01/vendas_20260101.parquet”, engine=”pyarrow”)

Tratamento de erros e logs

Pipelines de producao precisam de tratamento robusto:

import logging

logging.basicConfig(level=logging.INFO, format=”%(asctime)s – %(levelname)s – %(message)s”)
logger = logging.getLogger(__name__)

def pipeline_vendas():
try:
logger.info(“Iniciando extracao…”)
dados = extrair_api_paginada(“https://api.exemplo.com/vendas”)
logger.info(f”Extraidos {len(dados)} registros”)

df = pd.DataFrame(dados)
df_limpo = transformar(df)
logger.info(f”Apos limpeza: {len(df_limpo)} registros validos”)

carregar(df_limpo)
logger.info(“Pipeline concluido com sucesso”)
except requests.RequestException as e:
logger.error(f”Erro na extracao: {e}”)
raise
except Exception as e:
logger.error(f”Erro inesperado: {e}”)
raise

Ferramentas de ETL em 2026

Para quem prefere nao codar tudo do zero: Airbyte (open source) para E e L com centenas de conectores pre-prontos. Fivetran (SaaS pago) para E e L totalmente gerenciado. dbt para T dentro do warehouse. Airflow para orquestrar a execucao de todas as etapas.

Tem um projeto em mente?

Somos especialistas em transformar ideias em produtos digitais. Apps, sites, automações e IA — vamos construir juntos.

Resposta rápida Orçamento sem compromisso +100 projetos entregues
Compartilhar: