Plan — Direct-push bronze + SQS trigger chain

Context

The director isn't confident in the current bronze trigger pattern. Today the chain is:

agent (on-prem) → ZIP via HTTPS → raw-data-loader (.NET) → unzip → S3 PUT
S3 PUT on _manifest.json  →  S3 event  →  Lambda(bronze-trigger)  →  ecs.run_task  →  ECS Fargate one-shot (datalake worker)
ECS finishes → S3 PUT on state/<pid>/bronze.duckdb.zip
S3 PUT on bronze.duckdb.zip  →  S3 event  →  Lambda(silver-trigger)  →  ecs.run_task  →  ECS Fargate one-shot (transformer worker)

Three reasons the director is right to push back:

The proposed flow eliminates both event hops and the .NET intermediary:

agent (on-prem)  →  HTTPS POST  →  long-lived datalake service  →  in-process DuckDB ingest
                                                          │
                                                          ↓ on tick complete (sync, same request)
                                                  zip bronze.duckdb → S3 PUT
                                                          │
                                                          ↓ same request, before returning 200
                                                  SQS SendMessage (upflux-datapulse-bronze-completed)
                                                          │
                                                          ↓ silver Lambda subscribes to SQS, not S3
                                                  ecs.run_task (silver/transformer worker — unchanged)

Side-effect benefits:

Costs accepted:

Important context — none of this is running officially in production yet. No live tenants on the current S3-events trigger chain; no historical bronze.duckdb to migrate. That justifies a hard cutover (no feature flag, no shadow-write parity window) and lets us delete the RDL upload path from the agent in the same PR that introduces the chunked uploader. The FastAPI service code lives inside the existing upflux-datapulse-datalake repo under bronze/service/ — we are not spinning up a new repo or new service identity.

Diagramas para apresentação

Estado atual (o que a diretoria está vendo)

[Diagram]

Vermelho = triggers via S3 event (preocupação do diretor — at-least-once, sem DLQ, sem visibilidade de retry, sem backpressure). Amarelo = .NET service intermediário que será retirado.

Estado proposto

[Diagram]

Verde = serviço novo (Datalake long-lived). Azul = SQS substitui S3 event como trigger. Laranja = DynamoDB substitui os arquivos _*_ack.json (audit + dedup). RDL .NET sai. Lambda bronze-trigger sai. Camada silver→gold (futuro) repete o padrão SQS+DynamoDB.

Sequência detalhada do novo fluxo

[Diagram]

Modelo de concorrência (locks por org_id)

[Diagram]

Dentro de uma org, todas as requisições serializam (uma escrita DuckDB por vez — invariante de single-writer respeitada). Entre orgs diferentes, paralelizam. Concorrência DuckDB cai naturalmente porque a maioria das orgs tem 1-2 projects e taxa de tick baixa (~1/min).

Comparação lado-a-lado

Aspecto Hoje Proposto
Trigger bronze S3 event no _manifest.json HTTPS POST direto ao serviço
Trigger silver S3 event no bronze.duckdb.zip SQS message
.NET intermediário Raw-Data-Loader (.NET) Eliminado
Datalake runtime ECS Fargate one-shot (1 task por tick) ECS Fargate Service (long-lived, autoscale)
Resposta ao agente Fire-and-forget (sem typed errors) 200/409/422/5xx tipados
Recovery em falha Operador inspeciona S3 + aws s3 rm Re-PUT/re-finalize idempotente
DLQ / retry visibility ❌ S3 events não têm ✅ SQS DLQ + retry counters
Concorrência DuckDB 1 task = 1 project (Fargate isola) 1 lock por org (asyncio)
Crescimento futuro (gold) Mais um par Lambda+S3 event Mais um par SQS+Lambda — uniforme

Architecture

Trigger flow (proposed)

upflux-datapulse-agent          upflux-datapulse-datalake (long-lived)        upflux-datapulse-transformer
(on-prem)                        ECS Fargate Service + ALB                     (silver worker — ECS one-shot)
       │                                       │                                          │
       │ HTTPS PUT parquet parts               │                                          │
       │ ────────────────────────────────────▶ │                                          │
       │ HTTPS POST finalize (manifest body)   │ in-process DuckDB ingest                 │
       │ ────────────────────────────────────▶ │ (per-project asyncio.Lock,               │
       │                                       │  warm writer registry, CHECKPOINT)       │
       │                                       │                                          │
       │                                       │ same request:                            │
       │                                       │   1. zip /tmp/<pid>/bronze.duckdb        │
       │                                       │   2. S3 PUT state/<pid>/bronze.duckdb.zip│
       │                                       │   3. SQS SendMessage  ──────────────▶  upflux-datapulse-bronze-completed
       │                                       │   4. DynamoDB write bronze_committed (audit)           │
       │ ◀──── 200 OK {tick_id, status} ────── │                                                        ▼
       │                                                                                       silver_trigger Lambda
                                                                                                (SQS event source)
                                                                                                       │
                                                                                                       ▼
                                                                                                ecs.run_task (silver)

Stack & framework

HTTP contract

Two endpoints, no init-handle ceremony. tick_id is the natural idempotency key (already globally unique in state.py:78).

PUT  /v1/projects/{project_id}/ticks/{tick_id}/parts/{file_seq}
     Body: application/octet-stream (parquet bytes)
     Headers: X-Sha256: <hex>, X-Api-Key, X-Api-Secret
     Returns:
       202 Accepted (first upload, persisted)
       200 OK {already_present: true} (re-PUT same sha — idempotent)
       409 Conflict (different sha for same file_seq — refuses silent overwrite)

POST /v1/projects/{project_id}/ticks/{tick_id}/finalize
     Body: the _manifest.json verbatim
     Headers: X-Api-Key, X-Api-Secret
     Returns:
       200 OK {tick_id, status: "committed", rows_inserted, rows_replaced, committed_at}
              — synchronous: ingest + CHECKPOINT + S3 upload + SQS publish + DynamoDB audit-write all done before returning
       409 Conflict {missing: [...], mismatched_sha: [...]} — agent re-PUTs the bad parts
       422 Unprocessable {error_class, message} — manifest invalid, agent marks tick failed_manual
       425 Too Early — concurrent finalize in flight, retry with Retry-After
       5xx — server error, exponential backoff retry

PUT per part matches the agent's existing pagination (<stem>_seq=NNN.parquet from _PaginatedParquetWriter). POST /finalize is the integrity check: server validates manifest.tables[*].parquets[*].sha256 against what was uploaded; if anything missing, returns typed 409 so the agent re-PUTs only the bad parts.

Why chunked over monolithic ZIP: failure granularity. A 200 MB tick failing at byte 199 with monolithic POST retransmits 200 MB. Chunked retransmits one ~5 MB part. On flaky on-prem WAN this matters.

Sync per-request lifecycle (the load-bearing decision)

POST /finalize is synchronous through the entire bronze pipeline, not "fast-200 + background commit." Order:

  1. Validate manifest (pydantic). Auth dependency has already resolved org_id from X-Api-Key and asserted that project_id (from the path) belongs to that org.
  2. async with org_lock(org_id): — serializes all requests for this org on this replica.
  3. Acquire S3 outer lock for state/<pid>/lock if this replica doesn't already hold it for this project.
  4. Get-or-create BronzeWriter for project (downloads state/<pid>/bronze.duckdb.zip lazily on first request, stays warm in the per-project registry).
  5. tick_processor.process_tick_local(local_parquet_paths, manifest) — the existing process_tick from bronze/tick_processor.py:65, refactored to take local paths instead of s3.parquet_uri. DuckDB's read_parquet accepts local paths transparently.
  6. CHECKPOINT (forced, not deferred — this is the durability boundary).
  7. Zip /tmp/<pid>/bronze.duckdb and S3 PUT state/<pid>/bronze.duckdb.zip (existing state_sync.upload_state).
  8. SQS SendMessage — durable trigger for silver. If SQS fails, /finalize returns 5xx; agent retries; re-runs idempotent ingest + re-publishes SQS. SQS DLQ catches genuinely failed messages.
  9. Datalake updates its local SQLite ticks row to committed with result body cached (for idempotent re-finalize).
  10. Return 200 with RunSummary.

Acks dropped entirely — no _bronze_ack.json, no _silver_ack.json. Their three legacy roles split out:

Why sync over fast-200-async: the user's framing maps to sync ("zipa a base e sobe pro s3 → nessa subida, já manda um tick pra fila do SQS"). It also gives strong consistency — silver downloads bronze.duckdb.zip from S3, and the SQS message that triggers it always points at a state already in S3. Eventual-upload + decoupled SQS would make silver race the file.

SQS

Server-side state

A small SQLite at WORK_DIR/server.sqlite holds the per-tick coordination state (NOT in DuckDB — keeps WAL flushes from interfering with sync HTTP responses):

ticks(
  tick_id PRIMARY KEY,
  project_id, org_id,
  state ENUM('uploading'|'finalizing'|'committed'|'failed'),
  parts_received JSONB,    -- {file_seq: {sha256, bytes, received_at}}
  manifest JSONB NULL,
  result JSONB NULL,       -- cached response body for idempotent re-finalize
  created_at, updated_at, committed_at
)

Re-POST of /finalize after committed returns the cached result 200 — handles "agent crashed right after server committed but before the agent saw the response."

Failure modes

Scenario Recovery
Agent crashes mid-PUT (one part) On restart, agent's state.list_ready_ticks() still shows the tick; uploader re-runs. Re-PUT of parts already at server: 200 already_present. Missing parts: PUT them. Then POST /finalize.
Agent crashes between final PUT and /finalize Same. Server has parts in uploading state. Next agent boot re-PUTs (idempotent), then finalizes.
Server crashes mid-PUT TCP RST → agent retries (existing exponential backoff). If part landed on disk before crash, server re-receives + sha-matches: 200. Otherwise full re-receive.
Server crashes after DuckDB commit but before responding 200 Agent times out, retries POST /finalize. Server's ticks.state == 'committed', returns cached result 200.
S3 upload of bronze.duckdb.zip fails after DuckDB commit /finalize returns 5xx; SQS not published; DynamoDB not written; agent retries. Idempotent re-run uploads, publishes, persists.
SQS SendMessage fails after S3 upload /finalize returns 5xx; agent retries → idempotent ingest → re-upload bronze.duckdb.zip (idempotent on same content) → re-publish SQS. SQS retry covers transient outages; SQS DLQ catches genuinely failed messages after 5 attempts.
Silver SQS message lost (rare — DLQ failure or queue purge) DLQ depth alarm fires. Operator queries DynamoDB bronze_committed for ticks without matching silver_dispatched row, re-enqueues SQS messages from a recovery script.
Silver ECS task fails mid-batch SQS visibility timeout expires → message returns to queue → next Lambda invocation re-dispatches. DynamoDB silver_dispatched not written for failed ticks (write happens before run_task; if run_task succeeds but ECS task fails, conditional check on retry catches it).
Manifest fails validation 422 with typed error. Agent marks tick failed_manual (existing state.mark_tick_failed_manual).
Part sha doesn't match what manifest declared 409 from /finalize with {"mismatched_sha": [seq=2]}. Agent re-PUTs that part (DELETE-then-PUT, since same-key different-sha is refused on PUT).

The story for the director: every failure has a typed response code and a deterministic retry path. No "did the S3 event fire? did Lambda dedup? did ECS find a slot?" guesswork.

Per-repo changes

upflux-datapulse-datalake (the heaviest lift)

upflux-datapulse-agent

upflux-datapulse-transformer

upflux-datapulse-backend (Hub)

raw-data-loader (.NET, separate repo)

Phases / milestones

Hard cutover (nothing in production yet — no live tenants, no parallel run, no shadow-write parity needed).

  1. Spec + infra (1 sprint): SQS queue + DLQ + IAM (datalake task role gains sqs:SendMessage; silver Lambda gains sqs:ReceiveMessage/etc), ALB + target group + ECS service definition for the new long-lived datalake. CloudWatch alarms (5xx rate, finalize p95 latency, SQS queue depth, DLQ size).
  2. Datalake service v1 (2 sprints): in upflux-datapulse-datalake/bronze/service/ — FastAPI scaffold, two routes (PUT parts, POST finalize), writer registry, ingest path (refactor tick_processor.process_tick to take local paths), server-side SQLite for ticks coordination state, SIGTERM drain, observability (loguru + APM transactions + Prometheus/EMF metrics), Dockerfile entrypoint flip. Smoke against MinIO + LocalStack SQS via docker-compose.
  3. Agent uploader v2 (1 sprint): rename rdl_uploader.pybronze_uploader.py, rewrite the upload step to chunked PUT + finalize POST, delete send_to_raw_data_loader, replace raw_data_loader_url with datalake_url in config snapshot. Smoke E2E: agent (local) → datalake service (docker-compose) → MinIO → LocalStack SQS → silver Lambda (LocalStack) → silver ECS (local). Assert silver_dispatched row appears in DynamoDB Local + silver.duckdb.zip lands in MinIO with the expected rows.
  4. Silver Lambda swap (3 days): silver_trigger/handler.py event-source S3 → SQS, parse event["Records"][i]["body"] instead of regex on S3 key, add DynamoDB silver_dispatched conditional-put for SQS at-least-once handling. Update CLAUDE.md trigger-chain diagram.
  5. Cutover deploy: ship the four repo PRs together; tag releases; cut over in a single deploy window. Monitor SQS queue depth + DLQ + finalize 5xx for 48h. Roll back path = redeploy prior images (no data loss because nothing committed).

RDL retirement is a separate plan, opened after step 5 stabilizes.

Verification

Critical files

Decisions confirmed with user