Construindo um Pipeline ETL com Python, SQLAlchemy e PostgreSQL
Introdução
No cenário atual de dados massivos, as empresas precisam transformar informações brutas em insights acionáveis de forma rápida e confiável. Um pipeline de ETL (Extract, Transform, Load) bem estruturado garante que os dados sejam extraídos de fontes heterogêneas, transformados segundo regras de negócio e carregados em um repositório analítico preparado para consultas.
Este post mostra, passo a passo, como montar um pipeline completo usando apenas ferramentas de código aberto: Python, Pandas, SQLAlchemy e PostgreSQL. Ao final, você terá um script reutilizável, um pequeno job Bash para automação e dicas para escalar o processo.
Por que escolher Python e PostgreSQL para ETL?
- Python possui bibliotecas maduras para manipulação de dados (pandas, numpy) e integração com bancos (SQLAlchemy, psycopg2).
- PostgreSQL oferece performance, extensibilidade (JSONB, procedimentos armazenados) e suporte a transações ACID, essencial para cargas confiáveis.
- A combinação permite escrever todo o fluxo em uma única linguagem, facilitando manutenção e versionamento.
Arquitetura do Pipeline
A arquitetura proposta segue três camadas distintas:
- Extração – Conexão com APIs ou arquivos CSV.
- Transformação – Limpeza, enriquecimento e agregação usando Pandas.
- Carga – Persistência em tabelas dimensionais e fato no PostgreSQL via SQLAlchemy.
Diagrama simplificado
| Etapa | Tecnologia | Responsabilidade |
|---|---|---|
| Extract | requests / pandas.read_csv | Captura de dados brutos |
| Transform | pandas | Normalização, tipagem, cálculo de métricas |
| Load | SQLAlchemy + psycopg2 | Inserção em tabelas alvo |
Implementação Passo a Passo
1. Configuração do ambiente
bash
Cria um ambiente virtual isolado
python3 -m venv .venv source .venv/bin/activate
Instala as dependências necessárias
pip install pandas sqlalchemy psycopg2-binary requests
Dica: Use o
pip-toolsoupoetrypara travar versões e garantir reprodutibilidade.
2. Definindo as credenciais de conexão
Crie um arquivo .env (não versionado) com as variáveis abaixo:
DB_HOST=localhost DB_PORT=5432 DB_NAME=analytics DB_USER=etl_user DB_PASS=senha_forte
O script Python lerá essas variáveis usando a biblioteca os.
3. Código Python completo
python import os import requests import pandas as pd from sqlalchemy import create_engine, text from datetime import datetime
--------------------------------------------------
1️⃣ Configurações de conexão
--------------------------------------------------
DB_URL = ( f"postgresql+psycopg2://{os.getenv('DB_USER')}:{os.getenv('DB_PASS')}@" f"{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}" ) engine = create_engine(DB_URL, echo=False)
--------------------------------------------------
2️⃣ Função de extração (API pública de exemplo)
--------------------------------------------------
def extract_sales_data(url: str) -> pd.DataFrame: """Baixa o JSON da API e converte para DataFrame.""" response = requests.get(url, timeout=10) response.raise_for_status() data = response.json() df = pd.json_normalize(data, sep='_') return df
--------------------------------------------------
3️⃣ Transformação: limpeza e enriquecimento
--------------------------------------------------
def transform(df: pd.DataFrame) -> pd.DataFrame: # Normaliza colunas de data df['order_date'] = pd.to_datetime(df['order_date'], utc=True) # Remove linhas com valores nulos críticos df = df.dropna(subset=['order_id', 'customer_id', 'order_total']) # Calcula margem de lucro (exemplo fictício) df['profit'] = df['order_total'] * 0.2 # Cria coluna de ano/mês para particionamento df['year_month'] = df['order_date'].dt.to_period('M').astype(str) return df
--------------------------------------------------
4️⃣ Carga: inserção em tabelas PostgreSQL
--------------------------------------------------
def load(df: pd.DataFrame, table_name: str = 'stg_sales'):
"""Usa to_sql com método append e transação explícita."""
with engine.begin() as conn:
# Garante que a tabela exista (DDL simples)
conn.execute(text(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
order_total NUMERIC(12,2),
profit NUMERIC(12,2),
order_date TIMESTAMP WITH TIME ZONE,
year_month VARCHAR(7)
);
"""))
# Insere os registros
df.to_sql(table_name, con=conn, if_exists='append', index=False, method='multi')
--------------------------------------------------
5️⃣ Orquestração simples
--------------------------------------------------
if name == 'main': API_URL = 'https://api.example.com/v1/sales?limit=1000' raw_df = extract_sales_data(API_URL) clean_df = transform(raw_df) load(clean_df) print(f"{datetime.utcnow()} – ETL concluído com {len(clean_df)} registros.")
O código acima pode ser salvo como etl_sales.py e executado manualmente ou via agendador.
4. Automatizando com Bash e Cron
Crie um script run_etl.sh que carrega as variáveis de ambiente e dispara o Python:
bash #!/usr/bin/env bash set -euo pipefail
Carrega variáveis do .env (necessário instalar 'dotenv-cli' ou usar source)
export $(grep -v '^#' .env | xargs)
Executa o ETL
python etl_sales.py
Torne o script executável:
bash chmod +x run_etl.sh
Em seguida, adicione uma entrada ao crontab para rodar diariamente às 02:00 UTC:
cron 0 2 * * * /caminho/para/run_etl.sh >> /var/log/etl_sales.log 2>&1
Exemplos Práticos de Consultas Pós‑Carga
Com os dados já na tabela stg_sales, podemos criar uma view de fatos para análises:
sql CREATE OR REPLACE VIEW vw_sales_monthly AS SELECT year_month, COUNT(*) AS total_orders, SUM(order_total) AS revenue, SUM(profit) AS profit FROM stg_sales GROUP BY year_month ORDER BY year_month DESC;
Agora, no seu BI favorito (Metabase, Superset, Power BI), basta conectar ao PostgreSQL e usar a view acima para dashboards de desempenho.
Conclusão
Construir um pipeline ETL com Python, Pandas e PostgreSQL não requer ferramentas caras nem infraestrutura complexa. O fluxo apresentado cobre:
- Extração de API JSON;
- Transformação robusta com tipagem e cálculos de negócio;
- Carga segura usando SQLAlchemy e transações;
- Automação via Bash e Cron para execução recorrente.
Próximos passos recomendados:
- Versionamento do código com Git e revisão de pull‑requests.
- Testes unitários (pytest) para validar funções de transformação.
- Orquestração avançada com Apache Airflow ou Prefect, caso o volume de dados cresça.
- Monitoramento de jobs usando Prometheus + Grafana ou serviços de observabilidade.
Com essas práticas, seu time de dados ganha agilidade, confiabilidade e escalabilidade para transformar dados brutos em valor de negócio.



