Como Orquestrar ETL com Apache Airflow, Docker e PostgreSQL
Introdução
No cenário atual de Data Engineering, a necessidade de pipelines confiáveis, escaláveis e fáceis de manter tem se tornado crítica para empresas que dependem de decisões orientadas por dados. Entre as ferramentas mais adotadas para orquestração de fluxos de trabalho, o Apache Airflow se destaca por sua flexibilidade e modelo baseado em código Python. Quando combinamos Airflow com Docker e um banco de dados PostgreSQL, criamos um ambiente isolado, reproduzível e pronto para produção.
Neste post vamos percorrer todo o processo de configuração de um pipeline ETL simples: extração de dados de uma API pública, transformação em Python e carregamento no PostgreSQL. Você aprenderá a montar a infraestrutura com Docker Compose, escrever um DAG funcional e aplicar boas práticas de versionamento e monitoramento.
Desenvolvimento
1. Estrutura de diretórios
Organizar o código de forma previsível facilita a colaboração e a manutenção. A seguir, a estrutura recomendada para o projeto:
etl-airflow/ ├── dags/ # DAGs do Airflow │ └── etl_pipeline.py ├── scripts/ # Scripts auxiliares (ex.: conexão ao DB) │ └── db_utils.py ├── docker-compose.yml ├── Dockerfile # Imagem customizada do Airflow └── requirements.txt # Dependências Python
2. Dockerfile customizado
O Airflow oficial já oferece uma imagem pronta, mas é comum precisar de bibliotecas adicionais (por exemplo, psycopg2 para PostgreSQL). O Dockerfile abaixo estende a imagem oficial e instala as dependências necessárias.
dockerfile
Dockerfile
FROM apache/airflow:2.7.2-python3.11
Evita prompts interativos ao instalar pacotes do apt
ENV DEBIAN_FRONTEND=noninteractive
Instala dependências do sistema
RUN apt-get update && apt-get install -y
build-essential
libpq-dev &&
rm -rf /var/lib/apt/lists/*
Copia requirements.txt e instala pacotes Python
COPY requirements.txt /requirements.txt RUN pip install --no-cache-dir -r /requirements.txt
Copia DAGs e scripts para o diretório de trabalho do Airflow
COPY dags/ /opt/airflow/dags/ COPY scripts/ /opt/airflow/scripts/
Define usuário não-root (opcional, mas recomendado)
USER airflow
3. requirements.txt
text apache-airflow==2.7.2 psycopg2-binary==2.9.9 requests==2.31.0 pandas==2.2.0
4. docker‑compose.yml
O docker-compose orquestra três serviços: postgres, airflow‑webserver, airflow‑scheduler. Utilizamos a mesma rede interna para que o Airflow se conecte ao banco sem expor portas externas.
yaml
docker-compose.yml
version: "3.8"
services: postgres: image: postgres:15-alpine environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow_db volumes: - pg_data:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 10s timeout: 5s retries: 5
airflow-webserver: build: . depends_on: postgres: condition: service_healthy environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow_db AIRFLOW__CORE__FERNET_KEY: "$(python -c 'from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())')" _AIRFLOW_DB_UPGRADE: "true" ports: - "8080:8080" command: webserver volumes: - ./dags:/opt/airflow/dags - ./scripts:/opt/airflow/scripts restart: always
airflow-scheduler: build: . depends_on: - airflow-webserver environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow_db command: scheduler volumes: - ./dags:/opt/airflow/dags - ./scripts:/opt/airflow/scripts restart: always
volumes: pg_data:
5. DAG de exemplo: ETL de clima
O DAG abaixo consome a API pública OpenWeatherMap, transforma os dados em um DataFrame e grava na tabela weather do PostgreSQL.
python
dags/etl_pipeline.py
"""DAG de ETL que coleta dados de clima e persiste no PostgreSQL.
Requisitos:
- Airflow >=2.0
- Biblioteca requests
- Biblioteca pandas
- psycopg2-binary para conexão ao banco """
from datetime import datetime, timedelta import json import os import requests import pandas as pd from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook
---------------------------------------------------------------------
Configurações do DAG
---------------------------------------------------------------------
DEFAULT_ARGS = { "owner": "data-engineer", "depends_on_past": False, "retries": 1, "retry_delay": timedelta(minutes=5), }
with DAG( dag_id="weather_etl", default_args=DEFAULT_ARGS, description="Extrai clima de cidades brasileiras e carrega no PostgreSQL", schedule_interval="@daily", start_date=datetime(2024, 1, 1), catchup=False, tags=["etl", "weather", "postgres"], ) as dag:
def extract(**context):
"""Extrai dados da API OpenWeatherMap.
A chave da API deve estar definida na variável de ambiente
`OWM_API_KEY`. Para fins de demonstração, usamos a cidade de
São Paulo (código 3448439).
"""
api_key = os.getenv("OWM_API_KEY")
if not api_key:
raise ValueError("Variável OWM_API_KEY não definida")
url = (
f"https://api.openweathermap.org/data/2.5/weather?"
f"id=3448439&units=metric&appid={api_key}"
)
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
# Salva JSON bruto para debugging
context["ti"].xcom_push(key="raw_weather", value=data)
return data
def transform(**context):
"""Transforma o JSON bruto em um DataFrame simplificado."""
raw = context["ti"].xcom_pull(key="raw_weather", task_ids="extract")
df = pd.DataFrame({
"city": [raw["name"]],
"temp_c": [raw["main"]["temp"]],
"humidity": [raw["main"]["humidity"]],
"weather": [raw["weather"][0]["description"]],
"timestamp": [datetime.utcfromtimestamp(raw["dt"]).isoformat()],
})
# Passa o DataFrame como JSON para a próxima tarefa
context["ti"].xcom_push(key="weather_df", value=df.to_json(orient="records"))
return df
def load(**context):
"""Carrega o DataFrame no PostgreSQL usando PostgresHook."""
json_str = context["ti"].xcom_pull(key="weather_df", task_ids="transform")
df = pd.read_json(json_str)
# Conexão via PostgresHook (configurada no Airflow UI ou env vars)
pg_hook = PostgresHook(postgres_conn_id="postgres_default")
# Cria a tabela se não existir
create_sql = """
CREATE TABLE IF NOT EXISTS weather (
city TEXT,
temp_c DOUBLE PRECISION,
humidity INTEGER,
weather TEXT,
ts TIMESTAMP
);
"""
pg_hook.run(create_sql)
# Insere linhas
for _, row in df.iterrows():
insert_sql = """
INSERT INTO weather (city, temp_c, humidity, weather, ts)
VALUES (%s, %s, %s, %s, %s);
"""
pg_hook.run(insert_sql, parameters=(
row["city"], row["temp_c"], row["humidity"], row["weather"], row["timestamp"]
))
return "Loaded {} rows".format(len(df))
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
provide_context=True,
)
transform_task = PythonOperator(
task_id="transform",
python_callable=transform,
provide_context=True,
)
load_task = PythonOperator(
task_id="load",
python_callable=load,
provide_context=True,
)
extract_task >> transform_task >> load_task
6. Configurando a conexão no Airflow UI
- Acesse
http://localhost:8080. - No menu Admin > Connections, clique +.
- Preencha:
- Conn Id:
postgres_default - Conn Type:
Postgres - Host:
postgres - Schema:
airflow_db - Login:
airflow - Password:
airflow - Port:
5432
- Conn Id:
- Salve.
Com a conexão criada, o DAG já pode executar as três etapas sem ajustes adicionais.
Exemplos Práticos
6.1. Testando localmente sem Docker
Durante o desenvolvimento, pode ser útil validar as funções de extract e transform fora do Airflow. Basta criar um script test_etl.py:
python
test_etl.py
import os from dags.etl_pipeline import extract, transform
os.environ["OWM_API_KEY"] = "SUA_CHAVE_AQUI"
raw = extract() print("Raw JSON:", raw)
df = transform() print("DataFrame resultante:") print(df)
Execute python test_etl.py e verifique a saída. Essa prática reduz o tempo de depuração, já que o Airflow adiciona overhead de agendamento.
6.2. Monitoramento básico com Airflow UI
- Tree View: visualiza o histórico de execuções e identifica falhas rapidamente.
- Log: cada tarefa gera logs detalhados; clique na tarefa para abrir o console.
- Graph View: mostra dependências entre tarefas, útil ao expandir o pipeline.
6.3. Escalando com Celery Executor
Para pipelines de maior volume, troque o LocalExecutor por CeleryExecutor e adicione workers ao docker‑compose. O código do DAG permanece inalterado, demonstrando a vantagem de separar lógica de orquestração da infraestrutura.
Conclusão
Neste post construímos, passo a passo, um pipeline ETL completo usando Apache Airflow, Docker e PostgreSQL. Você aprendeu a:
- Estruturar o projeto de forma modular;
- Criar imagens Docker customizadas com dependências adicionais;
- Definir serviços com Docker Compose, incluindo health‑checks;
- Escrever um DAG que extrai, transforma e carrega dados de forma idempotente;
- Configurar a conexão ao banco via Airflow UI e monitorar execuções.
Próximos passos recomendados:
- Integrar testes unitários com
pytesteairflow‑test; - Implementar alertas via Slack ou e‑mail usando
EmailOperator; - Migrar o executor para Celery ou Kubernetes quando a carga crescer.
Com essa base, você está pronto para expandir o pipeline, adicionar novas fontes (S3, Kafka) e aplicar boas práticas de CI/CD para pipelines de dados.
