The director isn't confident in the current bronze trigger pattern. Today the chain is:
agent (on-prem) → ZIP via HTTPS → raw-data-loader (.NET) → unzip → S3 PUT
S3 PUT on _manifest.json → S3 event → Lambda(bronze-trigger) → ecs.run_task → ECS Fargate one-shot (datalake worker)
ECS finishes → S3 PUT on state/<pid>/bronze.duckdb.zip
S3 PUT on bronze.duckdb.zip → S3 event → Lambda(silver-trigger) → ecs.run_task → ECS Fargate one-shot (transformer worker)
Three reasons the director is right to push back:
_has_running_silver_task ECS task-list scan in silver_trigger/handler.py:129) exists because S3 events can fire multiple times for one PUT (multipart upload completes)._bronze_ack.json and aws s3 rm to replay. No typed failure mode at the boundary.The proposed flow eliminates both event hops and the .NET intermediary:
agent (on-prem) → HTTPS POST → long-lived datalake service → in-process DuckDB ingest
│
↓ on tick complete (sync, same request)
zip bronze.duckdb → S3 PUT
│
↓ same request, before returning 200
SQS SendMessage (upflux-datapulse-bronze-completed)
│
↓ silver Lambda subscribes to SQS, not S3
ecs.run_task (silver/transformer worker — unchanged)
Side-effect benefits:
raw-data-loader .NET service can be retired (one less language stack, one less deploy target, one less auth boundary).200 ok, 409 already_processed, 422 invalid_manifest, 5xx retry) instead of fire-and-forget into S3.Costs accepted:
state.pending_uploads); existing exponential backoff (sender.py:54) handles outages cleanly.Important context — none of this is running officially in production yet. No live tenants on the current S3-events trigger chain; no historical bronze.duckdb to migrate. That justifies a hard cutover (no feature flag, no shadow-write parity window) and lets us delete the RDL upload path from the agent in the same PR that introduces the chunked uploader. The FastAPI service code lives inside the existing upflux-datapulse-datalake repo under bronze/service/ — we are not spinning up a new repo or new service identity.
Vermelho = triggers via S3 event (preocupação do diretor — at-least-once, sem DLQ, sem visibilidade de retry, sem backpressure). Amarelo = .NET service intermediário que será retirado.
Verde = serviço novo (Datalake long-lived). Azul = SQS substitui S3 event como trigger. Laranja = DynamoDB substitui os arquivos _*_ack.json (audit + dedup).
RDL .NET sai. Lambda bronze-trigger sai. Camada silver→gold (futuro) repete o padrão SQS+DynamoDB.
Dentro de uma org, todas as requisições serializam (uma escrita DuckDB por vez — invariante de single-writer respeitada). Entre orgs diferentes, paralelizam. Concorrência DuckDB cai naturalmente porque a maioria das orgs tem 1-2 projects e taxa de tick baixa (~1/min).
| Aspecto | Hoje | Proposto |
|---|---|---|
| Trigger bronze | S3 event no _manifest.json |
HTTPS POST direto ao serviço |
| Trigger silver | S3 event no bronze.duckdb.zip |
SQS message |
| .NET intermediário | Raw-Data-Loader (.NET) | Eliminado |
| Datalake runtime | ECS Fargate one-shot (1 task por tick) | ECS Fargate Service (long-lived, autoscale) |
| Resposta ao agente | Fire-and-forget (sem typed errors) | 200/409/422/5xx tipados |
| Recovery em falha | Operador inspeciona S3 + aws s3 rm |
Re-PUT/re-finalize idempotente |
| DLQ / retry visibility | ❌ S3 events não têm | ✅ SQS DLQ + retry counters |
| Concorrência DuckDB | 1 task = 1 project (Fargate isola) | 1 lock por org (asyncio) |
| Crescimento futuro (gold) | Mais um par Lambda+S3 event | Mais um par SQS+Lambda — uniforme |
upflux-datapulse-agent upflux-datapulse-datalake (long-lived) upflux-datapulse-transformer
(on-prem) ECS Fargate Service + ALB (silver worker — ECS one-shot)
│ │ │
│ HTTPS PUT parquet parts │ │
│ ────────────────────────────────────▶ │ │
│ HTTPS POST finalize (manifest body) │ in-process DuckDB ingest │
│ ────────────────────────────────────▶ │ (per-project asyncio.Lock, │
│ │ warm writer registry, CHECKPOINT) │
│ │ │
│ │ same request: │
│ │ 1. zip /tmp/<pid>/bronze.duckdb │
│ │ 2. S3 PUT state/<pid>/bronze.duckdb.zip│
│ │ 3. SQS SendMessage ──────────────▶ upflux-datapulse-bronze-completed
│ │ 4. DynamoDB write bronze_committed (audit) │
│ ◀──── 200 OK {tick_id, status} ────── │ ▼
│ silver_trigger Lambda
(SQS event source)
│
▼
ecs.run_task (silver)
upflux-datapulse-backend/main.py:14) already runs this exact stack. Reuse app/core/apm_context_middleware.py, app/helpers/logging_setup.py, the loguru bootstrap. Pydantic v2 is already a dep via pydantic-settings.X-Api-Key=org_id, so the natural sharding boundary is the org. Within one org, all incoming requests (regardless of which project_id they target) serialize through a single asyncio.Lock; cross-org requests parallelize freely. This bounds DuckDB write concurrency to the per-org request rate (low — one customer doesn't fan out aggressively) while keeping the lock dict small.dict[org_id, asyncio.Lock] — the org-level inner ring.dict[project_id, BronzeWriter] — warm writer registry; DuckDB connections still per-project (each project has its own bronze.duckdb), evicted on idle TTL or LRU pressure. Inside the org-lock, the route resolves the project's writer.bronze/lock.py) is repurposed as the outer ring — same key as today (state/<pid>/lock), guards against multiple service replicas owning the same project's DuckDB file. Acquired lazily on first request for that project, held until the writer is evicted from the registry.RequestCountPerTarget. ALB with /healthz health check, deregistration delay 90s (longer than SIGTERM grace = 60s). Public-facing ALB; agents reach it over the same HTTPS path they use for the Hub today.BRONZE_RETENTION_DAYS=365.X-Api-Key (org_id) + X-Api-Secret (Fernet-encrypted, decrypted via payload_key from Hub config snapshot). Agent already has the machinery in sender.py:27. Tighten server-side: validate (org_id, project_id) tuple, not just flat trust on org_id.Two endpoints, no init-handle ceremony. tick_id is the natural idempotency key (already globally unique in state.py:78).
PUT /v1/projects/{project_id}/ticks/{tick_id}/parts/{file_seq}
Body: application/octet-stream (parquet bytes)
Headers: X-Sha256: <hex>, X-Api-Key, X-Api-Secret
Returns:
202 Accepted (first upload, persisted)
200 OK {already_present: true} (re-PUT same sha — idempotent)
409 Conflict (different sha for same file_seq — refuses silent overwrite)
POST /v1/projects/{project_id}/ticks/{tick_id}/finalize
Body: the _manifest.json verbatim
Headers: X-Api-Key, X-Api-Secret
Returns:
200 OK {tick_id, status: "committed", rows_inserted, rows_replaced, committed_at}
— synchronous: ingest + CHECKPOINT + S3 upload + SQS publish + DynamoDB audit-write all done before returning
409 Conflict {missing: [...], mismatched_sha: [...]} — agent re-PUTs the bad parts
422 Unprocessable {error_class, message} — manifest invalid, agent marks tick failed_manual
425 Too Early — concurrent finalize in flight, retry with Retry-After
5xx — server error, exponential backoff retry
PUT per part matches the agent's existing pagination (<stem>_seq=NNN.parquet from _PaginatedParquetWriter). POST /finalize is the integrity check: server validates manifest.tables[*].parquets[*].sha256 against what was uploaded; if anything missing, returns typed 409 so the agent re-PUTs only the bad parts.
Why chunked over monolithic ZIP: failure granularity. A 200 MB tick failing at byte 199 with monolithic POST retransmits 200 MB. Chunked retransmits one ~5 MB part. On flaky on-prem WAN this matters.
POST /finalize is synchronous through the entire bronze pipeline, not "fast-200 + background commit." Order:
org_id from X-Api-Key and asserted that project_id (from the path) belongs to that org.async with org_lock(org_id): — serializes all requests for this org on this replica.state/<pid>/lock if this replica doesn't already hold it for this project.BronzeWriter for project (downloads state/<pid>/bronze.duckdb.zip lazily on first request, stays warm in the per-project registry).tick_processor.process_tick_local(local_parquet_paths, manifest) — the existing process_tick from bronze/tick_processor.py:65, refactored to take local paths instead of s3.parquet_uri. DuckDB's read_parquet accepts local paths transparently./tmp/<pid>/bronze.duckdb and S3 PUT state/<pid>/bronze.duckdb.zip (existing state_sync.upload_state)./finalize returns 5xx; agent retries; re-runs idempotent ingest + re-publishes SQS. SQS DLQ catches genuinely failed messages.ticks row to committed with result body cached (for idempotent re-finalize).RunSummary.Acks dropped entirely — no _bronze_ack.json, no _silver_ack.json. Their three legacy roles split out:
aws sqs send-message re-enqueues a tick for silver re-processing. A silver replay --tick <id> CLI wraps this. Bronze re-process of a same tick: agent re-PUTs parts + re-finalize (idempotent on tick_id).bronze_committed keyed on tick_id written by datalake; silver_dispatched keyed on tick_id written by silver Lambda). Both have TTL ~30 days for cost control. CloudWatch Logs Insights queries the rest.Why sync over fast-200-async: the user's framing maps to sync ("zipa a base e sobe pro s3 → nessa subida, já manda um tick pra fila do SQS"). It also gives strong consistency — silver downloads bronze.duckdb.zip from S3, and the SQS message that triggers it always points at a state already in S3. Eventual-upload + decoupled SQS would make silver race the file.
upflux-datapulse-bronze-completed (Standard, not FIFO — silver dedupes by tick_id). Visibility timeout 15 min (silver's tick-processing budget). Retention 4 days (covers weekend backlogs). DLQ at maxReceiveCount=5.{
"schema_version": "1.0",
"org_id": "<org-code>",
"project_id": "<uuid>",
"tick_id": "20260507T120000__abc",
"mode": "delta",
"dt": "20260507",
"bronze_db_key": "state/<pid>/bronze.duckdb.zip",
"bronze_db_etag": "<etag of the upload that triggered this message>",
"emitted_at": "2026-05-07T12:00:30Z"
}
manifest_key — manifest never lands on S3; lives only in the datalake's local SQLite + DynamoDB audit table.bronze_db_etag lets silver assert it's reading the same bronze.duckdb.zip version that triggered the message (defense against stale download via S3 read-after-write inconsistency).tick_id is now load-bearing for silver routing — the silver ECS task processes exactly this tick (or a batch of them — see Lambda BatchSize below).BatchSize: 10 and MaximumBatchingWindowInSeconds: 5. Lambda receives up to 10 SQS messages per invocation, groups them by (org_id, project_id), dispatches one ECS task per group with TICK_IDS=<comma-separated> env var. ECS task processes that exact list. Amortizes Fargate cold start (1 task processes N ticks) without needing S3 sweep._has_running_silver_task ECS task-list scan with a DynamoDB silver_dispatched table keyed on tick_id with TTL 30 days. Conditional attribute_not_exists(tick_id) write before ecs.run_task covers (a) duplicate SQS messages, (b) Lambda retries, (c) re-enqueues during ops replay (operator clears the row first, then re-enqueues). Same table doubles as the audit trail.bronze_committed table keyed on tick_id, written by datalake at finalize step §9. TTL 30 days. Stores {tick_id, project_id, org_id, mode, committed_at, rows_inserted, rows_replaced, bronze_db_etag}. Replaces what _bronze_ack.json recorded.sqs:SendMessage + dynamodb:PutItem on bronze_committed. Silver Lambda gains sqs:ReceiveMessage/DeleteMessage/GetQueueAttributes/ChangeMessageVisibility + dynamodb:PutItem/Query on silver_dispatched. Drop s3:GetObject event source binding.A small SQLite at WORK_DIR/server.sqlite holds the per-tick coordination state (NOT in DuckDB — keeps WAL flushes from interfering with sync HTTP responses):
ticks(
tick_id PRIMARY KEY,
project_id, org_id,
state ENUM('uploading'|'finalizing'|'committed'|'failed'),
parts_received JSONB, -- {file_seq: {sha256, bytes, received_at}}
manifest JSONB NULL,
result JSONB NULL, -- cached response body for idempotent re-finalize
created_at, updated_at, committed_at
)
Re-POST of /finalize after committed returns the cached result 200 — handles "agent crashed right after server committed but before the agent saw the response."
| Scenario | Recovery |
|---|---|
| Agent crashes mid-PUT (one part) | On restart, agent's state.list_ready_ticks() still shows the tick; uploader re-runs. Re-PUT of parts already at server: 200 already_present. Missing parts: PUT them. Then POST /finalize. |
Agent crashes between final PUT and /finalize |
Same. Server has parts in uploading state. Next agent boot re-PUTs (idempotent), then finalizes. |
| Server crashes mid-PUT | TCP RST → agent retries (existing exponential backoff). If part landed on disk before crash, server re-receives + sha-matches: 200. Otherwise full re-receive. |
| Server crashes after DuckDB commit but before responding 200 | Agent times out, retries POST /finalize. Server's ticks.state == 'committed', returns cached result 200. |
S3 upload of bronze.duckdb.zip fails after DuckDB commit |
/finalize returns 5xx; SQS not published; DynamoDB not written; agent retries. Idempotent re-run uploads, publishes, persists. |
SQS SendMessage fails after S3 upload |
/finalize returns 5xx; agent retries → idempotent ingest → re-upload bronze.duckdb.zip (idempotent on same content) → re-publish SQS. SQS retry covers transient outages; SQS DLQ catches genuinely failed messages after 5 attempts. |
| Silver SQS message lost (rare — DLQ failure or queue purge) | DLQ depth alarm fires. Operator queries DynamoDB bronze_committed for ticks without matching silver_dispatched row, re-enqueues SQS messages from a recovery script. |
| Silver ECS task fails mid-batch | SQS visibility timeout expires → message returns to queue → next Lambda invocation re-dispatches. DynamoDB silver_dispatched not written for failed ticks (write happens before run_task; if run_task succeeds but ECS task fails, conditional check on retry catches it). |
| Manifest fails validation | 422 with typed error. Agent marks tick failed_manual (existing state.mark_tick_failed_manual). |
| Part sha doesn't match what manifest declared | 409 from /finalize with {"mismatched_sha": [seq=2]}. Agent re-PUTs that part (DELETE-then-PUT, since same-key different-sha is refused on PUT). |
The story for the director: every failure has a typed response code and a deterministic retry path. No "did the S3 event fire? did Lambda dedup? did ECS find a slot?" guesswork.
bronze/service/:app.py — FastAPI factory, lifespan = boot APM, S3/SQS clients, writer registry evictor.routes.py — the two endpoints + /healthz + /v1/projects/{pid}/status.auth.py — FastAPI dependency that resolves X-Api-Key/X-Api-Secret against Hub-cached per-project secrets.writer_registry.py — the dict[org_id, asyncio.Lock] (org-keyed inner ring) + dict[project_id, BronzeWriter] (project-keyed warm registry) + LRU/TTL evictor that calls writer.close() (CHECKPOINT + WAL guard) and re-uploads.state_db.py — SQLite for per-tick coordination state.sqs_publisher.py — boto3 SQS wrapper with retries.lifecycle.py — SIGTERM grace (drain + close all warm writers + release S3 locks).bronze/tick_processor.py — extract process_tick_local(local_parquet_paths, manifest, writer) from process_tick (bronze/tick_processor.py:65). The existing _write_one (line 215) already takes parquet_uris as a list; just feed local paths.bronze/runner.py — process_once() becomes legacy/dev-only (kept for bronze replay ops command). Production entrypoint moves to FastAPI app.main.py — add bronze serve --host --port Typer command; existing process/replay/status/backfill retained for ops.Dockerfile — entrypoint changes from bronze process --once to bronze serve --host 0.0.0.0 --port 8080.bronze/config.py — PROJECT_ID is no longer a service-wide setting; comes from request path. Add HTTP_HOST, HTTP_PORT, SQS_QUEUE_URL, WRITER_IDLE_TTL_SECONDS, SHUTDOWN_GRACE_SECONDS.bronze/ack.py — no longer used. Audit moves to DynamoDB bronze_committed table (new bronze/service/audit_db.py wrapping boto3 dynamodb).bronze/writer.py (CHECKPOINT + WAL guard + retention), bronze/manifest.py, bronze/state_sync.py, bronze/s3_client.py, bronze/hub_client.py, bronze/lock.py (semantically repurposed but code unchanged).tests/unit/test_ack.py (datalake) and equivalent in transformer are deleted along with bronze/ack.py and transformer_etl/ack.py.tests/unit/test_runner.py mostly retired — replaced by tests/unit/test_service_routes.py with FastAPI TestClient.test_writer_registry.py (LRU eviction, idle TTL, concurrent acquire), test_sqs_publisher.py (moto), test_lifecycle.py (SIGTERM drain).POST /finalize for the same project — assert serialized; for distinct projects — assert parallelized.rdl_uploader.py:85-218 — replace ZIP-and-POST with per-part PUT loop + finalize POST. The polling loop on state.list_ready_ticks() stays; only the upload step changes. Per-part attempts tracked in existing pending_uploads.attempt_count (no schema migration). RDL upload path is deleted in the same PR (no flag, no parallel branch).sender.py:17-57 — split into:put_tick_part(*, base_url, project_id, tick_id, file_seq, parquet_path, sha256_hex, org_id, api_secret) -> dict — streams parquet via requests.put(..., data=open(path, "rb"), stream=True) with X-Sha256 header.finalize_tick(*, base_url, project_id, tick_id, manifest_dict, org_id, api_secret) -> dict — JSON POST; raises typed TickFinalizeError(status_code, body) so uploader can branch on 409 vs 422 vs 5xx.sender.py:54) reused at both levels.send_to_raw_data_loader deleted.config_sync.py — replace raw_data_loader_url field with datalake_url. No transition aliasing needed (nothing in prod).rdl_uploader.py → bronze_uploader.py to reflect the new destination. Module docstring + log keys updated.infra/lambda/silver_trigger/handler.py:69-127 — swap S3 event parsing for SQS event parsing. Lambda receives a batch of records (event["Records"]); group by (org_id, project_id); for each group conditional-put silver_dispatched rows; spawn one ECS task per group with TICK_IDS=<comma-separated> env var.src/transformer_etl/runner.py — process_once() becomes message-driven: reads TICK_IDS env var (or --ticks CLI flag), processes exactly that list (no S3 sweep). Bronze data still consumed via ATTACH bronze.duckdb READ_ONLY; only the which-ticks-to-process discovery changes. Existing tick_processor.process_tick(tick_id) (per-tick processor) is reused with a different driver.runner.py step §5 in transformer's CLAUDE.md). Replaced by env-var-driven tick list.src/transformer_etl/ack.py — no _silver_ack.json writes anymore. Silver-side audit lives in DynamoDB silver_dispatched (written by Lambda) + a per-run summary log.sqs:ReceiveMessage/DeleteMessage/GetQueueAttributes/ChangeMessageVisibility + dynamodb:PutItem/Query on silver_dispatched; remove s3:GetObject event source binding.CLAUDE.md trigger-chain diagram + flow §5 — sweep model gone, replaced by SQS-batch-driven dispatch with explicit tick list.silver replay --tick <id> CLI command — re-enqueues the SQS message for that tick (after clearing the DynamoDB silver_dispatched row).raw_data_loader_url field with datalake_url in the agent config snapshot endpoint (/api/agent/config-snapshot). No flag, no transition alias.(org_id, project_id) → api_secret pairs via existing crypto pattern (app/services/crypto.py).Hard cutover (nothing in production yet — no live tenants, no parallel run, no shadow-write parity needed).
sqs:SendMessage; silver Lambda gains sqs:ReceiveMessage/etc), ALB + target group + ECS service definition for the new long-lived datalake. CloudWatch alarms (5xx rate, finalize p95 latency, SQS queue depth, DLQ size).upflux-datapulse-datalake/bronze/service/ — FastAPI scaffold, two routes (PUT parts, POST finalize), writer registry, ingest path (refactor tick_processor.process_tick to take local paths), server-side SQLite for ticks coordination state, SIGTERM drain, observability (loguru + APM transactions + Prometheus/EMF metrics), Dockerfile entrypoint flip. Smoke against MinIO + LocalStack SQS via docker-compose.rdl_uploader.py → bronze_uploader.py, rewrite the upload step to chunked PUT + finalize POST, delete send_to_raw_data_loader, replace raw_data_loader_url with datalake_url in config snapshot. Smoke E2E: agent (local) → datalake service (docker-compose) → MinIO → LocalStack SQS → silver Lambda (LocalStack) → silver ECS (local). Assert silver_dispatched row appears in DynamoDB Local + silver.duckdb.zip lands in MinIO with the expected rows.silver_trigger/handler.py event-source S3 → SQS, parse event["Records"][i]["body"] instead of regex on S3 key, add DynamoDB silver_dispatched conditional-put for SQS at-least-once handling. Update CLAUDE.md trigger-chain diagram.RDL retirement is a separate plan, opened after step 5 stabilizes.
tests/unit/test_ack.py (datalake) and equivalent in transformer are deleted along with bronze/ack.py and transformer_etl/ack.py.TestClient), writer registry concurrency, SQS publisher (moto), lifecycle/drain.POST /finalize for projects under the same org_id → fully serialized (one DuckDB writer instance touched at a time, instrument with Counter on connection acquire).POST /finalize for projects across 10 distinct org_ids → parallel within 2× single-org time (cross-org locks don't contend).already_present; same key, different sha → 409. Re-POST /finalize after committed → cached 200 with same body.scripts/smoke_local.py, assert one tick goes agent → datalake → bronze.duckdb.zip in MinIO + bronze_committed row in DynamoDB → SQS message → silver Lambda → silver_dispatched row + silver.duckdb.zip in MinIO with expected rows, all within budget.bronze.duckdb integrity check + row count match against expected), p95 finalize latency < 30s, autoscale fires within target.bronze_committed row eventually has a matching silver_dispatched row within budget (lag alarm).tick_id × file_seq × sha256, server-side partial state in SQLite.upflux-datapulse-datalake repo under bronze/service/. We are not creating a new repo or new service identity.