Como Orquestrar ETL com Apache Airflow, Docker e PostgreSQL

Como Orquestrar ETL com Apache Airflow, Docker e PostgreSQL

# Como Orquestrar ETL com Apache Airflow, Docker e PostgreSQL...

6 min de leitura
🔒 Faça login para curtir

Autor

Luis Henrique Cuba

Luis Henrique Cuba

Autor no blog LHCX.

Gostou do conteúdo?

🔒 Faça login para curtir

Sua curtida nos ajuda a melhorar

Como Orquestrar ETL com Apache Airflow, Docker e PostgreSQL

Tecnologia e Inovação

Introdução

No cenário atual de Data Engineering, a necessidade de pipelines confiáveis, escaláveis e fáceis de manter tem se tornado crítica para empresas que dependem de decisões orientadas por dados. Entre as ferramentas mais adotadas para orquestração de fluxos de trabalho, o Apache Airflow se destaca por sua flexibilidade e modelo baseado em código Python. Quando combinamos Airflow com Docker e um banco de dados PostgreSQL, criamos um ambiente isolado, reproduzível e pronto para produção.

Neste post vamos percorrer todo o processo de configuração de um pipeline ETL simples: extração de dados de uma API pública, transformação em Python e carregamento no PostgreSQL. Você aprenderá a montar a infraestrutura com Docker Compose, escrever um DAG funcional e aplicar boas práticas de versionamento e monitoramento.

Desenvolvimento

1. Estrutura de diretórios

Organizar o código de forma previsível facilita a colaboração e a manutenção. A seguir, a estrutura recomendada para o projeto:

etl-airflow/ ├── dags/ # DAGs do Airflow │ └── etl_pipeline.py ├── scripts/ # Scripts auxiliares (ex.: conexão ao DB) │ └── db_utils.py ├── docker-compose.yml ├── Dockerfile # Imagem customizada do Airflow └── requirements.txt # Dependências Python

2. Dockerfile customizado

O Airflow oficial já oferece uma imagem pronta, mas é comum precisar de bibliotecas adicionais (por exemplo, psycopg2 para PostgreSQL). O Dockerfile abaixo estende a imagem oficial e instala as dependências necessárias.

dockerfile

Dockerfile

FROM apache/airflow:2.7.2-python3.11

Evita prompts interativos ao instalar pacotes do apt

ENV DEBIAN_FRONTEND=noninteractive

Instala dependências do sistema

RUN apt-get update && apt-get install -y
build-essential
libpq-dev &&
rm -rf /var/lib/apt/lists/*

Copia requirements.txt e instala pacotes Python

COPY requirements.txt /requirements.txt RUN pip install --no-cache-dir -r /requirements.txt

Copia DAGs e scripts para o diretório de trabalho do Airflow

COPY dags/ /opt/airflow/dags/ COPY scripts/ /opt/airflow/scripts/

Define usuário não-root (opcional, mas recomendado)

USER airflow

3. requirements.txt

text apache-airflow==2.7.2 psycopg2-binary==2.9.9 requests==2.31.0 pandas==2.2.0

4. docker‑compose.yml

O docker-compose orquestra três serviços: postgres, airflow‑webserver, airflow‑scheduler. Utilizamos a mesma rede interna para que o Airflow se conecte ao banco sem expor portas externas.

yaml

docker-compose.yml

version: "3.8"

services: postgres: image: postgres:15-alpine environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow_db volumes: - pg_data:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 10s timeout: 5s retries: 5

airflow-webserver: build: . depends_on: postgres: condition: service_healthy environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow_db AIRFLOW__CORE__FERNET_KEY: "$(python -c 'from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())')" _AIRFLOW_DB_UPGRADE: "true" ports: - "8080:8080" command: webserver volumes: - ./dags:/opt/airflow/dags - ./scripts:/opt/airflow/scripts restart: always

airflow-scheduler: build: . depends_on: - airflow-webserver environment: AIRFLOW__CORE__EXECUTOR: LocalExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow_db command: scheduler volumes: - ./dags:/opt/airflow/dags - ./scripts:/opt/airflow/scripts restart: always

volumes: pg_data:

5. DAG de exemplo: ETL de clima

O DAG abaixo consome a API pública OpenWeatherMap, transforma os dados em um DataFrame e grava na tabela weather do PostgreSQL.

python

dags/etl_pipeline.py

"""DAG de ETL que coleta dados de clima e persiste no PostgreSQL.

Requisitos:

  • Airflow >=2.0
  • Biblioteca requests
  • Biblioteca pandas
  • psycopg2-binary para conexão ao banco """

from datetime import datetime, timedelta import json import os import requests import pandas as pd from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook

---------------------------------------------------------------------

Configurações do DAG

---------------------------------------------------------------------

DEFAULT_ARGS = { "owner": "data-engineer", "depends_on_past": False, "retries": 1, "retry_delay": timedelta(minutes=5), }

with DAG( dag_id="weather_etl", default_args=DEFAULT_ARGS, description="Extrai clima de cidades brasileiras e carrega no PostgreSQL", schedule_interval="@daily", start_date=datetime(2024, 1, 1), catchup=False, tags=["etl", "weather", "postgres"], ) as dag:

def extract(**context):
    """Extrai dados da API OpenWeatherMap.
    
    A chave da API deve estar definida na variável de ambiente
    `OWM_API_KEY`. Para fins de demonstração, usamos a cidade de
    São Paulo (código 3448439).
    """
    api_key = os.getenv("OWM_API_KEY")
    if not api_key:
        raise ValueError("Variável OWM_API_KEY não definida")
    url = (
        f"https://api.openweathermap.org/data/2.5/weather?"
        f"id=3448439&units=metric&appid={api_key}"
    )
    response = requests.get(url, timeout=10)
    response.raise_for_status()
    data = response.json()
    # Salva JSON bruto para debugging
    context["ti"].xcom_push(key="raw_weather", value=data)
    return data

def transform(**context):
    """Transforma o JSON bruto em um DataFrame simplificado."""
    raw = context["ti"].xcom_pull(key="raw_weather", task_ids="extract")
    df = pd.DataFrame({
        "city": [raw["name"]],
        "temp_c": [raw["main"]["temp"]],
        "humidity": [raw["main"]["humidity"]],
        "weather": [raw["weather"][0]["description"]],
        "timestamp": [datetime.utcfromtimestamp(raw["dt"]).isoformat()],
    })
    # Passa o DataFrame como JSON para a próxima tarefa
    context["ti"].xcom_push(key="weather_df", value=df.to_json(orient="records"))
    return df

def load(**context):
    """Carrega o DataFrame no PostgreSQL usando PostgresHook."""
    json_str = context["ti"].xcom_pull(key="weather_df", task_ids="transform")
    df = pd.read_json(json_str)
    # Conexão via PostgresHook (configurada no Airflow UI ou env vars)
    pg_hook = PostgresHook(postgres_conn_id="postgres_default")
    # Cria a tabela se não existir
    create_sql = """
    CREATE TABLE IF NOT EXISTS weather (
        city TEXT,
        temp_c DOUBLE PRECISION,
        humidity INTEGER,
        weather TEXT,
        ts TIMESTAMP
    );
    """
    pg_hook.run(create_sql)
    # Insere linhas
    for _, row in df.iterrows():
        insert_sql = """
        INSERT INTO weather (city, temp_c, humidity, weather, ts)
        VALUES (%s, %s, %s, %s, %s);
        """
        pg_hook.run(insert_sql, parameters=(
            row["city"], row["temp_c"], row["humidity"], row["weather"], row["timestamp"]
        ))
    return "Loaded {} rows".format(len(df))

extract_task = PythonOperator(
    task_id="extract",
    python_callable=extract,
    provide_context=True,
)

transform_task = PythonOperator(
    task_id="transform",
    python_callable=transform,
    provide_context=True,
)

load_task = PythonOperator(
    task_id="load",
    python_callable=load,
    provide_context=True,
)

extract_task >> transform_task >> load_task

6. Configurando a conexão no Airflow UI

  1. Acesse http://localhost:8080.
  2. No menu Admin > Connections, clique +.
  3. Preencha:
    • Conn Id: postgres_default
    • Conn Type: Postgres
    • Host: postgres
    • Schema: airflow_db
    • Login: airflow
    • Password: airflow
    • Port: 5432
  4. Salve.

Com a conexão criada, o DAG já pode executar as três etapas sem ajustes adicionais.

Desenvolvimento e Código

Exemplos Práticos

6.1. Testando localmente sem Docker

Durante o desenvolvimento, pode ser útil validar as funções de extract e transform fora do Airflow. Basta criar um script test_etl.py:

python

test_etl.py

import os from dags.etl_pipeline import extract, transform

os.environ["OWM_API_KEY"] = "SUA_CHAVE_AQUI"

raw = extract() print("Raw JSON:", raw)

df = transform() print("DataFrame resultante:") print(df)

Execute python test_etl.py e verifique a saída. Essa prática reduz o tempo de depuração, já que o Airflow adiciona overhead de agendamento.

6.2. Monitoramento básico com Airflow UI

  • Tree View: visualiza o histórico de execuções e identifica falhas rapidamente.
  • Log: cada tarefa gera logs detalhados; clique na tarefa para abrir o console.
  • Graph View: mostra dependências entre tarefas, útil ao expandir o pipeline.

6.3. Escalando com Celery Executor

Para pipelines de maior volume, troque o LocalExecutor por CeleryExecutor e adicione workers ao docker‑compose. O código do DAG permanece inalterado, demonstrando a vantagem de separar lógica de orquestração da infraestrutura.

Conclusão

Neste post construímos, passo a passo, um pipeline ETL completo usando Apache Airflow, Docker e PostgreSQL. Você aprendeu a:

  1. Estruturar o projeto de forma modular;
  2. Criar imagens Docker customizadas com dependências adicionais;
  3. Definir serviços com Docker Compose, incluindo health‑checks;
  4. Escrever um DAG que extrai, transforma e carrega dados de forma idempotente;
  5. Configurar a conexão ao banco via Airflow UI e monitorar execuções.

Próximos passos recomendados:

  • Integrar testes unitários com pytest e airflow‑test;
  • Implementar alertas via Slack ou e‑mail usando EmailOperator;
  • Migrar o executor para Celery ou Kubernetes quando a carga crescer.

Com essa base, você está pronto para expandir o pipeline, adicionar novas fontes (S3, Kafka) e aplicar boas práticas de CI/CD para pipelines de dados.

Tecnologia e Negócios

Carregando comentários...

Posts Relacionados

Não há posts relacionados no momento.