Plan — Bronze v3 (final): split intake em datapulse-datalake-api + ECS one-shot processor

Context

Feedback do SRE no plano v2 (sync 200): processamento bronze (download + ingest + upload) é pesado o suficiente pra ser inadequado segurar a conexão do agente. A análise está correta — typical tick = 5-30s, e múltiplos pipelines concorrentes saturariam o servidor sync.

Pivot pra arquitetura assíncrona end-to-end com novo repo dedicado pra intake:

Agent → datalake-api (NEW repo, FastAPI thin) → S3 + SQS → 202
                                                  ↓
                                  SQS bronze-ingest → Lambda → ECS one-shot bronze
                                                                  ↓
                                                       S3 + SQS bronze-completed
                                                                  ↓
                                                       Lambda → ECS one-shot silver (já implementado ✅)

Decisões confirmadas com usuário:

O que muda vs v2 (sync, FastAPI long-lived no datalake)

Aspecto v2 (sync) v3 (final)
Datalake runtime FastAPI long-lived (ECS Service) ECS Fargate one-shot triggered por Lambda
Intake do agente POST direto pro datalake POST pro novo datapulse-datalake-api
Resposta ao agente Sync 200 (5-30s) Async 202 (~500ms-2s, só upload + SQS publish)
Bronze trigger Em-process SQS bronze-ingest → Lambda → ECS
Silver trigger SQS bronze-completed ✅ Mantém
Path S3 bronze ✅ <org>/<pipeline>/bronze.duckdb.zip Mantém
Schema flat ✅ bronze_<table_base> Mantém
ZIP único do agente ✅ Mantém
Manifest dentro do ZIP ✅ Mantém
bronze/service/ (FastAPI) Construído Deletado (~600 linhas)
state_db.py, writer_registry.py Construídos Deletados
writer.py, tick_processor.py, lock.py, state_sync.py Mantidos

Architecture

[Diagram]

Sequência

[Diagram]

HTTP contract — datapulse-datalake-api

POST /v1/ingest
     Headers (required):
       X-Api-Key      — org_id (Hub config snapshot)
       X-Api-Secret   — api_secret (Fernet-decrypted by agent via payload_key)
       X-Tick-Id      — e.g. 20260508T120000__abc12345
       X-Project-Id   — UUID
       X-Pipeline     — string (Project.pipeline from Hub)
       X-Mode         — "delta" | "full"
       X-Dt           — YYYYMMDD
       Content-Type   — application/zip

     Body: bronze_<tick>.zip (parquets + _manifest.json no root)

     Returns:
       202 Accepted {
         "tick_id": "...",
         "status": "queued",
         "queued_at": "...",
         "zip_key": "<prefix>/<org>/<pipeline>/incoming/<tick>.zip"
       }
       — sync apenas pra upload + SQS publish (~500ms-2s).
         Processamento bronze é assíncrono.

       400 Bad Request — missing/invalid header
       401 Unauthorized — X-Api-Key/Secret missing or invalid
       403 Forbidden — org doesn't match X-Api-Key
       409 Conflict — tick_id already queued/processed (idempotente; retorna status atual)
       413 Payload Too Large — ZIP > MAX_ZIP_BYTES (default 500MB)
       422 Unprocessable — body não é ZIP válido
       5xx — server error, agent retry com backoff

GET /healthz       — ALB health check, 200 quando cache do Hub carregou
GET /readyz        — opcional: cache health

SQS shapes

bronze-ingest (datalake-api publica → bronze-trigger Lambda consome):

{
  "schema_version": "1.0",
  "org_id": "01",
  "pipeline": "p2p",
  "tick": "20260508T120000__abc",
  "zip_key": "<prefix>/01/p2p/incoming/20260508T120000__abc.zip",
  "project_id": "<uuid>",
  "mode": "delta",
  "dt": "20260508",
  "queued_at": "2026-05-08T12:00:30Z"
}

bronze-completed (bronze ECS publica → silver-trigger Lambda consome — já implementado):

{
  "schema_version": "1.0",
  "org_id": "01",
  "pipeline": "p2p",
  "tick": "...",
  "db_file": "<prefix>/01/p2p/bronze.duckdb.zip",
  "project_id": "<uuid>",
  "mode": "delta",
  "rows_inserted": 1234,
  "rows_replaced": 56,
  "emitted_at": "..."
}

Idempotência + falhas

Per-repo changes

NEW: upflux-datapulse-datalake-api

Repo novo, thin FastAPI intake. Zero DuckDB, zero processamento.

upflux-datapulse-datalake-api/
├── pyproject.toml          # FastAPI, uvicorn, boto3, pydantic-settings, loguru, requests
├── Dockerfile              # multistage Python 3.12-slim
├── main.py                 # uvicorn entry, configure_logging, create_app
├── app/
│   ├── __init__.py
│   ├── api.py              # FastAPI factory + lifespan (Hub cache refresh)
│   ├── core/
│   │   ├── config.py       # Settings (pattern BA igual aos outros repos)
│   │   ├── logging.py      # configure_logging (loguru + InterceptHandler)
│   │   └── apm.py          # (opt-in) Elastic APM
│   ├── auth.py             # X-Api-Key/Secret validation against Hub cache
│   ├── hub_client.py       # GET /api/internal/agent-credentials (cache TTL 5min)
│   ├── routes/
│   │   ├── __init__.py
│   │   ├── ingest.py       # POST /v1/ingest (streaming ZIP → S3 + SQS)
│   │   └── health.py       # /healthz, /readyz
│   └── services/
│       ├── s3_uploader.py  # boto3 multipart streaming upload
│       └── sqs_publisher.py # SendMessage com retries
├── tests/
│   ├── conftest.py
│   └── unit/
│       ├── test_ingest_route.py    # FastAPI TestClient + moto S3 + moto SQS
│       ├── test_auth.py            # cache hit/miss, expired, invalid
│       └── test_hub_client.py      # responses lib mocking Hub
├── docker-compose.dev.yml  # MinIO + LocalStack SQS pra dev
├── README.md
├── CLAUDE.md
├── .env.example
└── bitbucket-pipelines.yml

Settings (app/core/config.py):

class Settings(BaseSettings):
    # Service identity
    app_name: str = Field(default="datalake-api", alias="APP_NAME")
    host: str = Field(default="0.0.0.0", alias="HOST")
    port: int = Field(default=8080, alias="PORT")
    log_level: str = Field(default="INFO", alias="LOG_LEVEL")

    # Hub
    hub_url: str = Field(..., alias="HUB_URL")
    hub_token: str = Field(default="", alias="HUB_TOKEN")
    hub_credentials_ttl_seconds: int = Field(default=300, alias="HUB_CREDENTIALS_TTL_SECONDS")

    # S3
    aws_s3_bucket: str = Field(..., alias="AWS_S3_BUCKET")
    aws_s3_region: str = Field(default="us-east-1", alias="AWS_S3_REGION")
    aws_s3_endpoint_url: str = Field(default="", alias="AWS_S3_ENDPOINT_URL")
    aws_s3_prefix: str = Field(default="", alias="AWS_S3_PREFIX")
    aws_access_key_id: str = Field(default="", alias="AWS_ACCESS_KEY_ID")
    aws_secret_access_key: str = Field(default="", alias="AWS_SECRET_ACCESS_KEY")
    s3_multipart_chunk_bytes: int = Field(default=8 * 1024 * 1024, alias="S3_MULTIPART_CHUNK_BYTES")

    # SQS
    sqs_bronze_ingest_url: str = Field(..., alias="SQS_BRONZE_INGEST_URL")

    # Limits
    max_zip_bytes: int = Field(default=500 * 1024 * 1024, alias="MAX_ZIP_BYTES")  # 500MB

Auth flow (app/auth.py):

class AuthCache:
    """In-memory cache of {org_id → api_secret}. Refreshed periodically
    from Hub via background task. Lookup is O(1)."""

    def __init__(self, hub_client, ttl_seconds=300):
        self._hub = hub_client
        self._cache: dict[str, str] = {}
        self._loaded_at: datetime | None = None
        self._ttl = ttl_seconds

    async def refresh(self):
        # GET /api/internal/agent-credentials
        # Hub returns [{org_id, api_secret}] (already decrypted server-side
        # using the same payload_key the agent has)
        creds = await self._hub.list_agent_credentials()
        self._cache = {c["org_id"]: c["api_secret"] for c in creds}
        self._loaded_at = datetime.now(UTC)

    def is_valid(self, org_id: str, secret: str) -> bool:
        return self._cache.get(org_id) == secret


async def require_auth(
    x_api_key: str = Header(),
    x_api_secret: str = Header(),
    request: Request = ...,
) -> AuthContext:
    cache: AuthCache = request.app.state.auth_cache
    if not cache.is_valid(x_api_key, x_api_secret):
        raise HTTPException(401, "invalid credentials")
    return AuthContext(org_id=x_api_key)

Lifespan boots HubClient + AuthCache (initial refresh) + S3 client + SQS client + background task that re-refreshes cache every TTL.

Streaming upload (app/services/s3_uploader.py):

async def stream_to_s3(*, request_stream, s3_key: str, content_length: int) -> str:
    """Stream a request body to S3 using boto3 multipart upload.
    Returns the ETag of the uploaded object. Avoids loading the full
    body into memory — chunks of S3_MULTIPART_CHUNK_BYTES are uploaded
    as they arrive."""
    # Use boto3 create_multipart_upload + upload_part loop.

Tests: hermetic (moto for S3+SQS, responses for Hub).

upflux-datapulse-datalake (cleanup + ECS one-shot)

DELETE:

KEEP (sem mudança — todo o trabalho útil do v2):

ADD:

Modify:

upflux-datapulse-agent (1-step POST pro datalake-api)

Mais simples que o flow de 3 etapas que cogitei antes.

Modify bronze_uploader.py:

def _process_tick(self, tick_id):
    # 1. Build ZIP (igual já está)
    zip_path = build_tick_zip(tick_id, parquet_rows, manifest_row)

    # 2. POST to datalake-api (single request, async on the server side)
    result = sender.send_tick_zip_to_api(
        api_url=client.get("datalake_api_url"),  # NEW field in config snapshot
        zip_path=zip_path,
        org_id=org_id,
        api_secret=api_secret,
        tick_id=tick_id,
        project_id=project_id,
        pipeline=client.get("pipeline"),  # ALREADY in snapshot
        mode=manifest.get("mode"),
        dt=manifest.get("dt"),
    )
    # 202 → cleanup local + done
    # 409 → already_queued → cleanup local + done
    # 422 → mark failed_manual
    # 5xx → bump attempt, retry next poll

Modify sender.py:

upflux-datapulse-backend (Hub)

Pequenas adições pra suportar o cache de credenciais do datalake-api:

Existing UNIQUE (client_id, pipeline) migration: mantém.

upflux-datapulse-transformer (sem mudança)

Silver Lambda + ECS já SQS-driven com BRONZE_DB_KEY env var. Funciona sem alterações.

Phases

  1. Datalake — limpa FastAPI service + ZIP processor (1 sprint)

    • Deletar bronze/service/ package + tests + FastAPI deps
    • Mover sqs_publisher.py pra bronze/
    • Implementar bronze/zip_ingest.process_zip_from_s3
    • Typer command bronze process-zip
    • Tests: test_zip_ingest.py
  2. Datalake — bronze-trigger Lambda (3 dias)

    • Copiar do silver-trigger, ajustar SQS source + env overrides
    • Tests handler
  3. Novo repo — datapulse-datalake-api (1 sprint)

    • Scaffold (pyproject, Dockerfile, app/, tests/, docker-compose.dev)
    • Settings + logging + APM
    • Hub client + auth cache (with periodic refresh background task)
    • Streaming S3 uploader + SQS publisher
    • POST /v1/ingest route + healthz
    • Tests hermetics
  4. Backend — agent-credentials endpoint (3 dias)

    • Service agent_credentials.py — list clients + decrypt secrets
    • Endpoint /api/internal/agent-credentials (HUB_SERVICE_TOKEN auth)
    • Migration: add Client.datalake_api_url column
    • Update agent config snapshot DTO
  5. Agent — single-POST flow (2 dias)

    • sender.send_tick_zip_to_api
    • Modify bronze_uploader._process_tick to use new sender
    • Tests
  6. Cutover deploy (1-2 dias smoke + ship)

    • 5 PRs juntos. docker-compose smoke E2E (MinIO + LocalStack SQS + Hub + datalake-api + datalake bronze ECS local)
    • Tag releases. Order: Hub → datalake (ECS task def + bronze-trigger) → datalake-api → agent

Verification

Integration smoke (docker-compose):

MinIO + LocalStack SQS + Hub + datalake-api (uvicorn) + agent + bronze ECS task as subprocess

Loadtest pré-cutover:

Critical files

NEW repo upflux-datapulse-datalake-api/ — todos os arquivos listados na estrutura acima.

upflux-datapulse-datalake/:

upflux-datapulse-agent/:

upflux-datapulse-backend/:

Decisões fechadas