Comunicación Agentic ↔ RAG

Cómo se hablan ypf-solped-agentic-solution y ypf-solped-ingestion-etl

Mapa de la comunicación entre la capa agéntica (LangGraph) y el servicio RAG (FastAPI + pgvector). Pensado como referencia para onboarding y para diseñar cambios que toquen el contrato entre ambos servicios.


TL;DR

No le hace "preguntas" al RAG — le pega a un endpoint HTTP.

El agentic NO conversa con el RAG ni le delega razonamiento. El RAG es un servicio de retrieval puro: recibe una query (texto plano), genera el embedding con Azure OpenAI, busca los top-K chunks más cercanos en pgvector (cosine similarity), y devuelve los chunks crudos. El LLM vive en el agentic — el RAG nunca llama un modelo generativo.

Hay dos endpoints, uno por cada colección de conocimiento:

Endpoint Base de conocimiento Usado por
POST /search/pliegos-historicos RAG-1 — pliegos históricos de YPF HistoricalRAGAdapter
POST /search/normativas-internas RAG-2 — normativas internas NormativeRAGAdapter

Auth: propagación del JWT del usuario (mismo token MSAL que validó el request original en el agentic se reenvía como Authorization: Bearer … al RAG, que lo re-valida contra ypf-supply-ms-auth).


Diagrama de flujo — un request end-to-end

[Diagram]

Diagrama de arquitectura — quién depende de quién

[Diagram]

Detalle paso a paso

1. El frontend dispara el chat
POST /api/v1/chat/v2
Authorization: Bearer <JWT-de-MSAL>
{ "message": "Generame un pliego de servicios técnicos para …" }
2. Agentic valida el token y lo guarda en un ContextVar

El handler valida el JWT contra ypf-supply-ms-auth. Inmediatamente después guarda el token crudo en un ContextVar request-scoped:

Esto es clave: los nodos del grafo NO reciben el token como parámetro (ensuciaría toda la firma del State). Lo recuperan del ContextVar justo antes de pegarle al RAG.

3. El grafo LangGraph llega a un nodo que necesita conocimiento

Dos nodos relevantes:

Ambos están decorados con @guard_node_execution(): si la llamada al RAG falla, devuelven chunks vacíos y dejan flags rag1_available=False / rag2_available=False en el state. Fail-open: el grafo sigue avanzando.

4. El adapter HTTP arma el request

Payload típico:

{
  "query": "obligaciones del contratista en obras de mantenimiento",
  "top_k": 5,
  "threshold": 0.70,
  "tipo_pliego": "servicios"
}

Headers:

Authorization: Bearer <mismo JWT del usuario>
Accept: application/json

Política de reintentos (tenacity, exponencial backoff 1→2→4s, máx 2 intentos): solo reintenta sobre TimeoutException / ConnectError. NO reintenta sobre HTTP 4xx/5xx (un 401 del RAG significa "token inválido" y reintentar no arregla nada).

5. El RAG procesa la búsqueda

src/api/routers/search.py ejecuta esta secuencia:

Request → get_current_user (dependencies.py:32-83)
       → valida JWT contra ypf-supply-ms-auth (ypf_auth/adapter.py:91-181)
   ↓
_get_embedding_client (search.py:114)
   → obtiene OpenAIEmbeddingClient singleton de app.state
   ↓
embed_chunks (OpenAIEmbeddingClient)
   → Azure OpenAI embeddings.create(model="text-embedding-3-large")
   → vector[1536]
   ↓
pgvector search (asyncpg)
   → SQL: SELECT chunk, source, section, metadata,
                  1 - (embedding <=> $1::vector) AS score
            FROM pliegos_historicos
            WHERE 1 - (embedding <=> $1::vector) > $threshold
              [AND tipo_pliego = $4]      -- filtro opcional
            ORDER BY score DESC LIMIT $top_k
   ↓
SearchResponse { results: [ChunkResult...], total: int }

Para /search/normativas-internas se agrega un filtro temporal WHERE vigente_desde <= NOW() para no devolver normativas no vigentes todavía.

6. El nodo inyecta los chunks al state
state.rag1_chunks = [c.chunk for c in response.results]
state.rag2_chunks = [...]
7. El LLM (en el agentic) usa los chunks como contexto

Cuando un nodo de generación arma el prompt, concatena los chunks como contexto y llama a Azure OpenAI chat.completions. El RAG nunca llama al LLM generativo — solo retrieval.


Contrato de API

POST /search/pliegos-historicos
Auth Authorization: Bearer <JWT> (validado contra ypf-supply-ms-auth)
Request { query: str, top_k?: int [1-20, default 5], threshold?: float [0-1, default 0.70], tipo_pliego?: str }
Response 200 { results: [ChunkResult], total: int }
401 Token ausente / inválido / auth service rechazó
503 embedding_client no disponible (lifespan no logró armarlo)
422 Request inválido (campos desconocidos — extra="forbid")
POST /search/normativas-internas

Mismo contrato pero sin tipo_pliego y con filtro implícito vigente_desde <= NOW().

ChunkResult (response shape)
class ChunkResult(BaseModel):
    chunk: str          ## texto crudo del chunk
    source: str         ## numero_contrato (RAG-1) | codigo_norma (RAG-2)
    section: str        ## tipo_pliego (RAG-1) | tipo_norma (RAG-2)
    score: float        ## 1 - cosine_distance (mayor = más relevante)
    metadata: dict      ## campos adicionales (fecha, sector, etc.)

Definido en src/domain/rag/models.py:42-107.

Endpoints de admin (no son retrieval — solo para distinguir)
Endpoint Propósito
GET /admin/rag/status Stats de colecciones (docs, chunks, última ingestión)
POST /admin/rag/pipeline/trigger Dispara ETL (full / incremental); 409 si ya corre uno
GET /admin/rag/pipeline/status Progreso del pipeline en ejecución
POST /admin/rag/documents/{id}/retry Reintentar doc fallido
GET /health/ready / GET /health/live Probes K8s (RagHealthPoller del agentic lo consume)

Modelo de auth — la "trust chain"

Frontend (MSAL)
   │  Bearer <JWT firmado por Azure AD>
   ▼
ypf-solped-agentic-solution
   │  validate_token() → ypf-supply-ms-auth → OK
   │  (guarda el JWT en ContextVar)
   │
   │  Bearer <MISMO JWT del usuario>
   ▼
ypf-solped-ingestion-etl
   │  validate_token() → ypf-supply-ms-auth → OK (re-validación)
   ▼
pgvector + Azure OpenAI

Decisiones de diseño:


Resiliencia y observabilidad

Mecanismo Ubicación Detalle
Timeout HTTP RAG_TIMEOUT_SECONDS=10.0 Por request, no acumulativo
Retries RAG_MAX_RETRIES=2 (tenacity) Backoff 1→2→4s. Solo sobre TimeoutException / ConnectError, no 4xx/5xx
Fail-open en el nodo @guard_node_execution() en cada retrieve_*_node Si el RAG no responde, el nodo devuelve chunks vacíos + ragN_available=False. El grafo sigue
Feature flag RAG_ENABLED_FEATURE_FLAG=true Si false, los adapters se cortocircuitan y retornan chunks vacíos sin tocar HTTP
Health polling RagHealthPoller (background task del agentic) GET /health cada 60s → desactiva los adapters mientras el RAG esté degraded para evitar 30s de timeouts
Logging estructurado structlog (vía nelvin-sdk) correlation_id se propaga del request del frontend hasta los logs del pipeline ETL
Trazabilidad Langfuse El embedding del query queda registrado (cliente armado con get_async_openai_client() de nelvin)

Configuración (env vars del agentic)

## URL base del servicio RAG
RAG_SERVICE_URL=http://ypf-rag-service:8001

## Feature flag global — apaga TODA la integración RAG
RAG_ENABLED_FEATURE_FLAG=true

## HTTP client
RAG_TIMEOUT_SECONDS=10.0
RAG_MAX_RETRIES=2

## RAG-1 (histórico)
RAG1_TOP_K=5
RAG1_THRESHOLD=0.70

## RAG-2 (normativas)
RAG2_TOP_K=3
RAG2_THRESHOLD=0.70

Del lado del RAG (ya cubierto en CLAUDE.md del repo ypf-solped-ingestion-etl): YPF_AUTH_BASE_URL, OPENAI_API_KEY, DATABASE_URL, POSTGRES_SCHEMA, etc.


Referencias a archivos clave

ypf-solped-agentic-solution:

Componente Archivo:línea
Adapter HTTP histórico src/integrations/rag/historical_rag.py:156-161
Adapter HTTP normativas src/integrations/rag/normative_rag.py:144-149
ContextVar de auth src/integrations/rag/auth_context.py:31-66
Nodo retrieve histórico src/workflows/pliego_generation/nodes/retrieve_historical_context_node.py:34-38
Nodo retrieve normativas src/workflows/pliego_generation/nodes/retrieve_normative_context_node.py:35-39
Nodo propose_structure (usa ambos) src/workflows/pliego_creation/nodes/propose_structure_node.py
Health poller (background task — buscar RagHealthPoller)

ypf-solped-ingestion-etl:

Componente Archivo:línea
Router /search/* src/api/routers/search.py (pliegos-historicos: ~136, normativas-internas: ~196)
Dependency get_current_user src/api/dependencies.py:32-83
Adapter YPFAuthService src/integrations/ypf_auth/adapter.py:91-181
Modelos ChunkResult / SearchResponse src/domain/rag/models.py:42-107
Singleton OpenAIEmbeddingClient src/app/lifespan.py:287, 300
Router /admin/rag/* src/api/routers/rag_admin.py

Notas finales

No hay orquestador intermedio. El agentic llama directo al RAG por HTTP. No hay un message broker, ni un proxy de retrieval, ni un servicio intermedio que "decida" qué chunks devolver. La inteligencia de retrieval está concentrada en el endpoint del RAG (embed → pgvector → top-K).

El RAG no toma decisiones de producto. No hace rerank con LLM, no genera respuestas, no resume. Es deliberado: la lógica de cómo usar los chunks vive en el agentic. Esto desacopla evoluciones del modelo generativo de evoluciones de la ingesta.

Para evolucionar el contrato: cualquier cambio en SearchRequest / ChunkResult debe coordinarse con el repo agentic. Los adapters HTTP del lado del agentic NO usan pydantic compartido con el RAG — tienen su propia copia del schema. Esto evita acoplamiento de despliegue pero significa que un breaking change requiere PRs sincronizados en ambos repos.