Construindo um Pipeline ETL com Python, SQLAlchemy e PostgreSQL

Construindo um Pipeline ETL com Python, SQLAlchemy e PostgreSQL

# Construindo um Pipeline ETL com Python, SQLAlchemy e PostgreSQL...

5 min de leitura
🔒 Faça login para curtir

Autor

Luis Henrique Cuba

Luis Henrique Cuba

Autor no blog LHCX.

Gostou do conteúdo?

🔒 Faça login para curtir

Sua curtida nos ajuda a melhorar

Construindo um Pipeline ETL com Python, SQLAlchemy e PostgreSQL


Introdução

No cenário atual de dados massivos, as empresas precisam transformar informações brutas em insights acionáveis de forma rápida e confiável. Um pipeline de ETL (Extract, Transform, Load) bem estruturado garante que os dados sejam extraídos de fontes heterogêneas, transformados segundo regras de negócio e carregados em um repositório analítico preparado para consultas.

Este post mostra, passo a passo, como montar um pipeline completo usando apenas ferramentas de código aberto: Python, Pandas, SQLAlchemy e PostgreSQL. Ao final, você terá um script reutilizável, um pequeno job Bash para automação e dicas para escalar o processo.

Tecnologia e Inovação


Por que escolher Python e PostgreSQL para ETL?

  • Python possui bibliotecas maduras para manipulação de dados (pandas, numpy) e integração com bancos (SQLAlchemy, psycopg2).
  • PostgreSQL oferece performance, extensibilidade (JSONB, procedimentos armazenados) e suporte a transações ACID, essencial para cargas confiáveis.
  • A combinação permite escrever todo o fluxo em uma única linguagem, facilitando manutenção e versionamento.

Arquitetura do Pipeline

A arquitetura proposta segue três camadas distintas:

  1. Extração – Conexão com APIs ou arquivos CSV.
  2. Transformação – Limpeza, enriquecimento e agregação usando Pandas.
  3. Carga – Persistência em tabelas dimensionais e fato no PostgreSQL via SQLAlchemy.

Desenvolvimento e Código

Diagrama simplificado

EtapaTecnologiaResponsabilidade
Extractrequests / pandas.read_csvCaptura de dados brutos
TransformpandasNormalização, tipagem, cálculo de métricas
LoadSQLAlchemy + psycopg2Inserção em tabelas alvo

Implementação Passo a Passo

1. Configuração do ambiente

bash

Cria um ambiente virtual isolado

python3 -m venv .venv source .venv/bin/activate

Instala as dependências necessárias

pip install pandas sqlalchemy psycopg2-binary requests

Dica: Use o pip-tools ou poetry para travar versões e garantir reprodutibilidade.

2. Definindo as credenciais de conexão

Crie um arquivo .env (não versionado) com as variáveis abaixo:

DB_HOST=localhost DB_PORT=5432 DB_NAME=analytics DB_USER=etl_user DB_PASS=senha_forte

O script Python lerá essas variáveis usando a biblioteca os.

3. Código Python completo

python import os import requests import pandas as pd from sqlalchemy import create_engine, text from datetime import datetime

--------------------------------------------------

1️⃣ Configurações de conexão

--------------------------------------------------

DB_URL = ( f"postgresql+psycopg2://{os.getenv('DB_USER')}:{os.getenv('DB_PASS')}@" f"{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}" ) engine = create_engine(DB_URL, echo=False)

--------------------------------------------------

2️⃣ Função de extração (API pública de exemplo)

--------------------------------------------------

def extract_sales_data(url: str) -> pd.DataFrame: """Baixa o JSON da API e converte para DataFrame.""" response = requests.get(url, timeout=10) response.raise_for_status() data = response.json() df = pd.json_normalize(data, sep='_') return df

--------------------------------------------------

3️⃣ Transformação: limpeza e enriquecimento

--------------------------------------------------

def transform(df: pd.DataFrame) -> pd.DataFrame: # Normaliza colunas de data df['order_date'] = pd.to_datetime(df['order_date'], utc=True) # Remove linhas com valores nulos críticos df = df.dropna(subset=['order_id', 'customer_id', 'order_total']) # Calcula margem de lucro (exemplo fictício) df['profit'] = df['order_total'] * 0.2 # Cria coluna de ano/mês para particionamento df['year_month'] = df['order_date'].dt.to_period('M').astype(str) return df

--------------------------------------------------

4️⃣ Carga: inserção em tabelas PostgreSQL

--------------------------------------------------

def load(df: pd.DataFrame, table_name: str = 'stg_sales'): """Usa to_sql com método append e transação explícita.""" with engine.begin() as conn: # Garante que a tabela exista (DDL simples) conn.execute(text(f""" CREATE TABLE IF NOT EXISTS {table_name} ( order_id BIGINT PRIMARY KEY, customer_id BIGINT NOT NULL, order_total NUMERIC(12,2), profit NUMERIC(12,2), order_date TIMESTAMP WITH TIME ZONE, year_month VARCHAR(7) ); """)) # Insere os registros df.to_sql(table_name, con=conn, if_exists='append', index=False, method='multi')

--------------------------------------------------

5️⃣ Orquestração simples

--------------------------------------------------

if name == 'main': API_URL = 'https://api.example.com/v1/sales?limit=1000' raw_df = extract_sales_data(API_URL) clean_df = transform(raw_df) load(clean_df) print(f"{datetime.utcnow()} – ETL concluído com {len(clean_df)} registros.")

O código acima pode ser salvo como etl_sales.py e executado manualmente ou via agendador.

4. Automatizando com Bash e Cron

Crie um script run_etl.sh que carrega as variáveis de ambiente e dispara o Python:

bash #!/usr/bin/env bash set -euo pipefail

Carrega variáveis do .env (necessário instalar 'dotenv-cli' ou usar source)

export $(grep -v '^#' .env | xargs)

Executa o ETL

python etl_sales.py

Torne o script executável:

bash chmod +x run_etl.sh

Em seguida, adicione uma entrada ao crontab para rodar diariamente às 02:00 UTC:

cron 0 2 * * * /caminho/para/run_etl.sh >> /var/log/etl_sales.log 2>&1


Exemplos Práticos de Consultas Pós‑Carga

Com os dados já na tabela stg_sales, podemos criar uma view de fatos para análises:

sql CREATE OR REPLACE VIEW vw_sales_monthly AS SELECT year_month, COUNT(*) AS total_orders, SUM(order_total) AS revenue, SUM(profit) AS profit FROM stg_sales GROUP BY year_month ORDER BY year_month DESC;

Agora, no seu BI favorito (Metabase, Superset, Power BI), basta conectar ao PostgreSQL e usar a view acima para dashboards de desempenho.


Conclusão

Construir um pipeline ETL com Python, Pandas e PostgreSQL não requer ferramentas caras nem infraestrutura complexa. O fluxo apresentado cobre:

  • Extração de API JSON;
  • Transformação robusta com tipagem e cálculos de negócio;
  • Carga segura usando SQLAlchemy e transações;
  • Automação via Bash e Cron para execução recorrente.

Próximos passos recomendados:

  1. Versionamento do código com Git e revisão de pull‑requests.
  2. Testes unitários (pytest) para validar funções de transformação.
  3. Orquestração avançada com Apache Airflow ou Prefect, caso o volume de dados cresça.
  4. Monitoramento de jobs usando Prometheus + Grafana ou serviços de observabilidade.

Com essas práticas, seu time de dados ganha agilidade, confiabilidade e escalabilidade para transformar dados brutos em valor de negócio.

Tecnologia e Negócios

Tags

Carregando comentários...