datapulse-datalake-api + ECS one-shot processorFeedback do SRE no plano v2 (sync 200): processamento bronze (download + ingest + upload) é pesado o suficiente pra ser inadequado segurar a conexão do agente. A análise está correta — typical tick = 5-30s, e múltiplos pipelines concorrentes saturariam o servidor sync.
Pivot pra arquitetura assíncrona end-to-end com novo repo dedicado pra intake:
Agent → datalake-api (NEW repo, FastAPI thin) → S3 + SQS → 202
↓
SQS bronze-ingest → Lambda → ECS one-shot bronze
↓
S3 + SQS bronze-completed
↓
Lambda → ECS one-shot silver (já implementado ✅)
Decisões confirmadas com usuário:
upflux-datapulse-datalake-api (novo) — só intake. Hub backend NÃO é o lugar disso (Hub é admin/config; intake é data plane).X-Api-Key/X-Api-Secret + cache Hub-side. API consulta Hub no startup (e periodicamente, TTL 5min) pra cachear {org_id → api_secret}. Validação local por request, sem hop por chamada.| Aspecto | v2 (sync) | v3 (final) |
|---|---|---|
| Datalake runtime | FastAPI long-lived (ECS Service) | ECS Fargate one-shot triggered por Lambda |
| Intake do agente | POST direto pro datalake | POST pro novo datapulse-datalake-api |
| Resposta ao agente | Sync 200 (5-30s) | Async 202 (~500ms-2s, só upload + SQS publish) |
| Bronze trigger | Em-process | SQS bronze-ingest → Lambda → ECS |
| Silver trigger | SQS bronze-completed ✅ | Mantém |
| Path S3 bronze ✅ | <org>/<pipeline>/bronze.duckdb.zip |
Mantém |
| Schema flat ✅ | bronze_<table_base> |
Mantém |
| ZIP único do agente ✅ | Mantém | |
| Manifest dentro do ZIP ✅ | Mantém | |
bronze/service/ (FastAPI) |
Construído | Deletado (~600 linhas) |
state_db.py, writer_registry.py |
Construídos | Deletados |
writer.py, tick_processor.py, lock.py, state_sync.py ✅ |
Mantidos |
datapulse-datalake-apiPOST /v1/ingest
Headers (required):
X-Api-Key — org_id (Hub config snapshot)
X-Api-Secret — api_secret (Fernet-decrypted by agent via payload_key)
X-Tick-Id — e.g. 20260508T120000__abc12345
X-Project-Id — UUID
X-Pipeline — string (Project.pipeline from Hub)
X-Mode — "delta" | "full"
X-Dt — YYYYMMDD
Content-Type — application/zip
Body: bronze_<tick>.zip (parquets + _manifest.json no root)
Returns:
202 Accepted {
"tick_id": "...",
"status": "queued",
"queued_at": "...",
"zip_key": "<prefix>/<org>/<pipeline>/incoming/<tick>.zip"
}
— sync apenas pra upload + SQS publish (~500ms-2s).
Processamento bronze é assíncrono.
400 Bad Request — missing/invalid header
401 Unauthorized — X-Api-Key/Secret missing or invalid
403 Forbidden — org doesn't match X-Api-Key
409 Conflict — tick_id already queued/processed (idempotente; retorna status atual)
413 Payload Too Large — ZIP > MAX_ZIP_BYTES (default 500MB)
422 Unprocessable — body não é ZIP válido
5xx — server error, agent retry com backoff
GET /healthz — ALB health check, 200 quando cache do Hub carregou
GET /readyz — opcional: cache health
bronze-ingest (datalake-api publica → bronze-trigger Lambda consome):
{
"schema_version": "1.0",
"org_id": "01",
"pipeline": "p2p",
"tick": "20260508T120000__abc",
"zip_key": "<prefix>/01/p2p/incoming/20260508T120000__abc.zip",
"project_id": "<uuid>",
"mode": "delta",
"dt": "20260508",
"queued_at": "2026-05-08T12:00:30Z"
}
bronze-completed (bronze ECS publica → silver-trigger Lambda consome — já implementado):
{
"schema_version": "1.0",
"org_id": "01",
"pipeline": "p2p",
"tick": "...",
"db_file": "<prefix>/01/p2p/bronze.duckdb.zip",
"project_id": "<uuid>",
"mode": "delta",
"rows_inserted": 1234,
"rows_replaced": 56,
"emitted_at": "..."
}
incoming/<tick>.zip. Se já existe, 409 com {status: "queued"} (agente trata como sucesso, limpa local)._run_id check é a idempotência final (re-rodar mesmo run_id é no-op).incoming/<tick>.zip órfão (ECS subiu lock mas crash antes de DELETE): operador roda limpeza manual ou S3 lifecycle rule expira após 7 dias.upflux-datapulse-datalake-apiRepo novo, thin FastAPI intake. Zero DuckDB, zero processamento.
upflux-datapulse-datalake-api/
├── pyproject.toml # FastAPI, uvicorn, boto3, pydantic-settings, loguru, requests
├── Dockerfile # multistage Python 3.12-slim
├── main.py # uvicorn entry, configure_logging, create_app
├── app/
│ ├── __init__.py
│ ├── api.py # FastAPI factory + lifespan (Hub cache refresh)
│ ├── core/
│ │ ├── config.py # Settings (pattern BA igual aos outros repos)
│ │ ├── logging.py # configure_logging (loguru + InterceptHandler)
│ │ └── apm.py # (opt-in) Elastic APM
│ ├── auth.py # X-Api-Key/Secret validation against Hub cache
│ ├── hub_client.py # GET /api/internal/agent-credentials (cache TTL 5min)
│ ├── routes/
│ │ ├── __init__.py
│ │ ├── ingest.py # POST /v1/ingest (streaming ZIP → S3 + SQS)
│ │ └── health.py # /healthz, /readyz
│ └── services/
│ ├── s3_uploader.py # boto3 multipart streaming upload
│ └── sqs_publisher.py # SendMessage com retries
├── tests/
│ ├── conftest.py
│ └── unit/
│ ├── test_ingest_route.py # FastAPI TestClient + moto S3 + moto SQS
│ ├── test_auth.py # cache hit/miss, expired, invalid
│ └── test_hub_client.py # responses lib mocking Hub
├── docker-compose.dev.yml # MinIO + LocalStack SQS pra dev
├── README.md
├── CLAUDE.md
├── .env.example
└── bitbucket-pipelines.yml
Settings (app/core/config.py):
class Settings(BaseSettings):
# Service identity
app_name: str = Field(default="datalake-api", alias="APP_NAME")
host: str = Field(default="0.0.0.0", alias="HOST")
port: int = Field(default=8080, alias="PORT")
log_level: str = Field(default="INFO", alias="LOG_LEVEL")
# Hub
hub_url: str = Field(..., alias="HUB_URL")
hub_token: str = Field(default="", alias="HUB_TOKEN")
hub_credentials_ttl_seconds: int = Field(default=300, alias="HUB_CREDENTIALS_TTL_SECONDS")
# S3
aws_s3_bucket: str = Field(..., alias="AWS_S3_BUCKET")
aws_s3_region: str = Field(default="us-east-1", alias="AWS_S3_REGION")
aws_s3_endpoint_url: str = Field(default="", alias="AWS_S3_ENDPOINT_URL")
aws_s3_prefix: str = Field(default="", alias="AWS_S3_PREFIX")
aws_access_key_id: str = Field(default="", alias="AWS_ACCESS_KEY_ID")
aws_secret_access_key: str = Field(default="", alias="AWS_SECRET_ACCESS_KEY")
s3_multipart_chunk_bytes: int = Field(default=8 * 1024 * 1024, alias="S3_MULTIPART_CHUNK_BYTES")
# SQS
sqs_bronze_ingest_url: str = Field(..., alias="SQS_BRONZE_INGEST_URL")
# Limits
max_zip_bytes: int = Field(default=500 * 1024 * 1024, alias="MAX_ZIP_BYTES") # 500MB
Auth flow (app/auth.py):
class AuthCache:
"""In-memory cache of {org_id → api_secret}. Refreshed periodically
from Hub via background task. Lookup is O(1)."""
def __init__(self, hub_client, ttl_seconds=300):
self._hub = hub_client
self._cache: dict[str, str] = {}
self._loaded_at: datetime | None = None
self._ttl = ttl_seconds
async def refresh(self):
# GET /api/internal/agent-credentials
# Hub returns [{org_id, api_secret}] (already decrypted server-side
# using the same payload_key the agent has)
creds = await self._hub.list_agent_credentials()
self._cache = {c["org_id"]: c["api_secret"] for c in creds}
self._loaded_at = datetime.now(UTC)
def is_valid(self, org_id: str, secret: str) -> bool:
return self._cache.get(org_id) == secret
async def require_auth(
x_api_key: str = Header(),
x_api_secret: str = Header(),
request: Request = ...,
) -> AuthContext:
cache: AuthCache = request.app.state.auth_cache
if not cache.is_valid(x_api_key, x_api_secret):
raise HTTPException(401, "invalid credentials")
return AuthContext(org_id=x_api_key)
Lifespan boots HubClient + AuthCache (initial refresh) + S3 client + SQS client + background task that re-refreshes cache every TTL.
Streaming upload (app/services/s3_uploader.py):
async def stream_to_s3(*, request_stream, s3_key: str, content_length: int) -> str:
"""Stream a request body to S3 using boto3 multipart upload.
Returns the ETag of the uploaded object. Avoids loading the full
body into memory — chunks of S3_MULTIPART_CHUNK_BYTES are uploaded
as they arrive."""
# Use boto3 create_multipart_upload + upload_part loop.
Tests: hermetic (moto for S3+SQS, responses for Hub).
upflux-datapulse-datalake (cleanup + ECS one-shot)DELETE:
bronze/service/ (app, lifecycle, routes, auth, writer_registry, state_db) — esse trabalho do v2 vira lixotests/unit/test_state_db.pyfastapi, uvicorn, python-multipart, httpx, pytest-asyncio do pyprojectKEEP (sem mudança — todo o trabalho útil do v2):
bronze/writer.py — schema flat + _tick_id + full mode (✅)bronze/tick_processor.py — process_tick_local (✅)bronze/state_sync.py — paths org/pipeline (✅)bronze/lock.py — pipeline-keyed lock (✅)bronze/manifest.pybronze/hub_client.py — BronzeConfig.pipeline (✅)bronze/sqs_publisher.py (mover de bronze/service/ pra bronze/)ADD:
bronze/zip_ingest.py:def process_zip_from_s3(
*,
s3_zip_key: str,
org_id: str,
pipeline: str,
tick_id: str,
project_id: str,
...
) -> RunSummary:
"""One-shot: download incoming ZIP from S3, ingest into the
pipeline's consolidated bronze.duckdb, upload, delete incoming,
publish bronze-completed SQS."""
main.py:bronze process-zip \
--s3-zip-key <key> \
--org-id 01 --pipeline p2p \
--tick-id ... --project-id ...
infra/lambda/bronze_trigger/handler.py (parallel ao silver_trigger):bronze-ingestecs.list_tasks por tag pipeline=<P>ecs.run_task com env overridesModify:
Dockerfile — entrypoint volta pra ["python", "main.py", "process-zip"] (one-shot)bronze/config.py — drop HTTP_HOST/HTTP_PORT/SQS_QUEUE_URL pode reabilitar; manter SQS_BRONZE_COMPLETED_QUEUE_URLupflux-datapulse-agent (1-step POST pro datalake-api)Mais simples que o flow de 3 etapas que cogitei antes.
Modify bronze_uploader.py:
def _process_tick(self, tick_id):
# 1. Build ZIP (igual já está)
zip_path = build_tick_zip(tick_id, parquet_rows, manifest_row)
# 2. POST to datalake-api (single request, async on the server side)
result = sender.send_tick_zip_to_api(
api_url=client.get("datalake_api_url"), # NEW field in config snapshot
zip_path=zip_path,
org_id=org_id,
api_secret=api_secret,
tick_id=tick_id,
project_id=project_id,
pipeline=client.get("pipeline"), # ALREADY in snapshot
mode=manifest.get("mode"),
dt=manifest.get("dt"),
)
# 202 → cleanup local + done
# 409 → already_queued → cleanup local + done
# 422 → mark failed_manual
# 5xx → bump attempt, retry next poll
Modify sender.py:
send_tick_zip (era v2 — POST direto pro datalake)send_tick_zip_to_api(*, api_url, zip_path, org_id, api_secret, tick_id, project_id, pipeline, mode, dt):X-Api-Key/X-Api-Secret/X-Tick-Id/X-Project-Id/X-Pipeline/X-Mode/X-DtTickIngestError(status_code, body)send_to_raw_data_loader legacy pra triggerupflux-datapulse-backend (Hub)Pequenas adições pra suportar o cache de credenciais do datalake-api:
GET /api/internal/agent-credentials:HUB_SERVICE_TOKEN).[{"org_id": "01", "api_secret": "<plaintext>"}] — Hub decrypta o Client.api_secret (que está encrypted at rest com ENCRYPTION_KEY) antes de retornar.Client.datalake_api_url (similar ao datalake_url que já tem) — campo novo na model + Alembic migration. Agente lê do config snapshot.HUB_SERVICE_TOKEN env var.Existing UNIQUE (client_id, pipeline) migration: mantém.
upflux-datapulse-transformer (sem mudança)Silver Lambda + ECS já SQS-driven com BRONZE_DB_KEY env var. Funciona sem alterações.
Datalake — limpa FastAPI service + ZIP processor (1 sprint)
bronze/service/ package + tests + FastAPI depssqs_publisher.py pra bronze/bronze/zip_ingest.process_zip_from_s3bronze process-ziptest_zip_ingest.pyDatalake — bronze-trigger Lambda (3 dias)
Novo repo — datapulse-datalake-api (1 sprint)
Backend — agent-credentials endpoint (3 dias)
agent_credentials.py — list clients + decrypt secrets/api/internal/agent-credentials (HUB_SERVICE_TOKEN auth)Client.datalake_api_url columnAgent — single-POST flow (2 dias)
sender.send_tick_zip_to_apibronze_uploader._process_tick to use new senderCutover deploy (1-2 dias smoke + ship)
test_zip_ingest.py (~6 tests) + test_bronze_trigger_handler.py (~5 tests).test_bronze_uploader.py for new sender shape (~85 tests stay green)test_agent_credentials.py for the internal endpoint (auth, decrypt, list shape)Integration smoke (docker-compose):
MinIO + LocalStack SQS + Hub + datalake-api (uvicorn) + agent + bronze ECS task as subprocess
Loadtest pré-cutover:
/v1/ingest < 2s para ZIP de 100MBNEW repo upflux-datapulse-datalake-api/ — todos os arquivos listados na estrutura acima.
upflux-datapulse-datalake/:
bronze/zip_ingest.py (novo)bronze/sqs_publisher.py (movido de bronze/service/)main.py — process-zip Typer commandDockerfile — entrypoint volta one-shotinfra/lambda/bronze_trigger/handler.py (novo)bronze/service/ inteira + tests/unit/test_state_db.pyupflux-datapulse-agent/:
sender.py — drop send_tick_zip, add send_tick_zip_to_apibronze_uploader.py — _process_tick chama o novo senderupflux-datapulse-backend/:
app/services/agent_credentials.py (novo)app/routes/internal.py (novo, ou reusar route existente) — GET /api/internal/agent-credentialsapp/schemas/agent.py — adicionar datalake_api_url no ClientSnapshotapp/models/client.py — adicionar datalake_api_url columnalembic/versions/datalake_api_url_001.py (novo)datapulse-datalake-api separado do datapulse-datalakebronze_<table_base> (sem client_<org> prefix)<org_id>/<pipeline>/bronze.duckdb.zip consolidadoincoming/<tick>.zip cleanup: ECS deleta após sucesso + S3 lifecycle 7d como fallback