Cómo se hablan
ypf-solped-agentic-solutionyypf-solped-ingestion-etlMapa de la comunicación entre la capa agéntica (LangGraph) y el servicio RAG (FastAPI + pgvector). Pensado como referencia para onboarding y para diseñar cambios que toquen el contrato entre ambos servicios.
No le hace "preguntas" al RAG — le pega a un endpoint HTTP.
El agentic NO conversa con el RAG ni le delega razonamiento. El RAG es un
servicio de retrieval puro: recibe una query (texto plano), genera el
embedding con Azure OpenAI, busca los top-K chunks más cercanos en pgvector
(cosine similarity), y devuelve los chunks crudos. El LLM vive en el
agentic — el RAG nunca llama un modelo generativo.
Hay dos endpoints, uno por cada colección de conocimiento:
| Endpoint | Base de conocimiento | Usado por |
|---|---|---|
POST /search/pliegos-historicos |
RAG-1 — pliegos históricos de YPF | HistoricalRAGAdapter |
POST /search/normativas-internas |
RAG-2 — normativas internas | NormativeRAGAdapter |
Auth: propagación del JWT del usuario (mismo token MSAL que validó el
request original en el agentic se reenvía como Authorization: Bearer … al
RAG, que lo re-valida contra ypf-supply-ms-auth).
POST /api/v1/chat/v2
Authorization: Bearer <JWT-de-MSAL>
{ "message": "Generame un pliego de servicios técnicos para …" }
ContextVarEl handler valida el JWT contra ypf-supply-ms-auth. Inmediatamente después
guarda el token crudo en un ContextVar request-scoped:
src/integrations/rag/auth_context.py:31-66 — set_request_auth_token() /
get_request_auth_token().Esto es clave: los nodos del grafo NO reciben el token como parámetro
(ensuciaría toda la firma del State). Lo recuperan del ContextVar justo antes
de pegarle al RAG.
Dos nodos relevantes:
src/workflows/pliego_generation/nodes/retrieve_historical_context_node.py:34-38
— recibe historical_rag: HistoricalRAGPort por inyección, arma la query
desde el state (nombre de la sección + tipo de licitación) y llama
historical_rag.search(query).src/workflows/pliego_generation/nodes/retrieve_normative_context_node.py:35-39
— idem para normativas.Ambos están decorados con @guard_node_execution(): si la llamada al RAG
falla, devuelven chunks vacíos y dejan flags rag1_available=False /
rag2_available=False en el state. Fail-open: el grafo sigue avanzando.
src/integrations/rag/historical_rag.py:156-161 →
httpx.AsyncClient(timeout=10.0).post(url, json=payload, headers=...)src/integrations/rag/normative_rag.py:144-149 → mismo patrón.Payload típico:
{
"query": "obligaciones del contratista en obras de mantenimiento",
"top_k": 5,
"threshold": 0.70,
"tipo_pliego": "servicios"
}
Headers:
Authorization: Bearer <mismo JWT del usuario>
Accept: application/json
Política de reintentos (tenacity, exponencial backoff 1→2→4s, máx 2 intentos):
solo reintenta sobre TimeoutException / ConnectError. NO reintenta
sobre HTTP 4xx/5xx (un 401 del RAG significa "token inválido" y reintentar no
arregla nada).
src/api/routers/search.py ejecuta esta secuencia:
Request → get_current_user (dependencies.py:32-83)
→ valida JWT contra ypf-supply-ms-auth (ypf_auth/adapter.py:91-181)
↓
_get_embedding_client (search.py:114)
→ obtiene OpenAIEmbeddingClient singleton de app.state
↓
embed_chunks (OpenAIEmbeddingClient)
→ Azure OpenAI embeddings.create(model="text-embedding-3-large")
→ vector[1536]
↓
pgvector search (asyncpg)
→ SQL: SELECT chunk, source, section, metadata,
1 - (embedding <=> $1::vector) AS score
FROM pliegos_historicos
WHERE 1 - (embedding <=> $1::vector) > $threshold
[AND tipo_pliego = $4] -- filtro opcional
ORDER BY score DESC LIMIT $top_k
↓
SearchResponse { results: [ChunkResult...], total: int }
Para /search/normativas-internas se agrega un filtro temporal
WHERE vigente_desde <= NOW() para no devolver normativas no vigentes todavía.
state.rag1_chunks = [c.chunk for c in response.results]
state.rag2_chunks = [...]
Cuando un nodo de generación arma el prompt, concatena los chunks como
contexto y llama a Azure OpenAI chat.completions. El RAG nunca llama al LLM
generativo — solo retrieval.
POST /search/pliegos-historicos| Auth | Authorization: Bearer <JWT> (validado contra ypf-supply-ms-auth) |
| Request | { query: str, top_k?: int [1-20, default 5], threshold?: float [0-1, default 0.70], tipo_pliego?: str } |
| Response 200 | { results: [ChunkResult], total: int } |
| 401 | Token ausente / inválido / auth service rechazó |
| 503 | embedding_client no disponible (lifespan no logró armarlo) |
| 422 | Request inválido (campos desconocidos — extra="forbid") |
POST /search/normativas-internasMismo contrato pero sin tipo_pliego y con filtro implícito
vigente_desde <= NOW().
ChunkResult (response shape)class ChunkResult(BaseModel):
chunk: str ## texto crudo del chunk
source: str ## numero_contrato (RAG-1) | codigo_norma (RAG-2)
section: str ## tipo_pliego (RAG-1) | tipo_norma (RAG-2)
score: float ## 1 - cosine_distance (mayor = más relevante)
metadata: dict ## campos adicionales (fecha, sector, etc.)
Definido en src/domain/rag/models.py:42-107.
| Endpoint | Propósito |
|---|---|
GET /admin/rag/status |
Stats de colecciones (docs, chunks, última ingestión) |
POST /admin/rag/pipeline/trigger |
Dispara ETL (full / incremental); 409 si ya corre uno |
GET /admin/rag/pipeline/status |
Progreso del pipeline en ejecución |
POST /admin/rag/documents/{id}/retry |
Reintentar doc fallido |
GET /health/ready / GET /health/live |
Probes K8s (RagHealthPoller del agentic lo consume) |
Frontend (MSAL)
│ Bearer <JWT firmado por Azure AD>
▼
ypf-solped-agentic-solution
│ validate_token() → ypf-supply-ms-auth → OK
│ (guarda el JWT en ContextVar)
│
│ Bearer <MISMO JWT del usuario>
▼
ypf-solped-ingestion-etl
│ validate_token() → ypf-supply-ms-auth → OK (re-validación)
▼
pgvector + Azure OpenAI
Decisiones de diseño:
RAG_API_KEY ni
cliente con credenciales propias.ypf-supply-ms-auth — fuente única de verdad.| Mecanismo | Ubicación | Detalle |
|---|---|---|
| Timeout HTTP | RAG_TIMEOUT_SECONDS=10.0 |
Por request, no acumulativo |
| Retries | RAG_MAX_RETRIES=2 (tenacity) |
Backoff 1→2→4s. Solo sobre TimeoutException / ConnectError, no 4xx/5xx |
| Fail-open en el nodo | @guard_node_execution() en cada retrieve_*_node |
Si el RAG no responde, el nodo devuelve chunks vacíos + ragN_available=False. El grafo sigue |
| Feature flag | RAG_ENABLED_FEATURE_FLAG=true |
Si false, los adapters se cortocircuitan y retornan chunks vacíos sin tocar HTTP |
| Health polling | RagHealthPoller (background task del agentic) |
GET /health cada 60s → desactiva los adapters mientras el RAG esté degraded para evitar 30s de timeouts |
| Logging estructurado | structlog (vía nelvin-sdk) |
correlation_id se propaga del request del frontend hasta los logs del pipeline ETL |
| Trazabilidad | Langfuse | El embedding del query queda registrado (cliente armado con get_async_openai_client() de nelvin) |
## URL base del servicio RAG
RAG_SERVICE_URL=http://ypf-rag-service:8001
## Feature flag global — apaga TODA la integración RAG
RAG_ENABLED_FEATURE_FLAG=true
## HTTP client
RAG_TIMEOUT_SECONDS=10.0
RAG_MAX_RETRIES=2
## RAG-1 (histórico)
RAG1_TOP_K=5
RAG1_THRESHOLD=0.70
## RAG-2 (normativas)
RAG2_TOP_K=3
RAG2_THRESHOLD=0.70
Del lado del RAG (ya cubierto en CLAUDE.md del repo ypf-solped-ingestion-etl):
YPF_AUTH_BASE_URL, OPENAI_API_KEY, DATABASE_URL, POSTGRES_SCHEMA, etc.
ypf-solped-agentic-solution:
| Componente | Archivo:línea |
|---|---|
| Adapter HTTP histórico | src/integrations/rag/historical_rag.py:156-161 |
| Adapter HTTP normativas | src/integrations/rag/normative_rag.py:144-149 |
| ContextVar de auth | src/integrations/rag/auth_context.py:31-66 |
| Nodo retrieve histórico | src/workflows/pliego_generation/nodes/retrieve_historical_context_node.py:34-38 |
| Nodo retrieve normativas | src/workflows/pliego_generation/nodes/retrieve_normative_context_node.py:35-39 |
| Nodo propose_structure (usa ambos) | src/workflows/pliego_creation/nodes/propose_structure_node.py |
| Health poller | (background task — buscar RagHealthPoller) |
ypf-solped-ingestion-etl:
| Componente | Archivo:línea |
|---|---|
Router /search/* |
src/api/routers/search.py (pliegos-historicos: ~136, normativas-internas: ~196) |
Dependency get_current_user |
src/api/dependencies.py:32-83 |
Adapter YPFAuthService |
src/integrations/ypf_auth/adapter.py:91-181 |
Modelos ChunkResult / SearchResponse |
src/domain/rag/models.py:42-107 |
Singleton OpenAIEmbeddingClient |
src/app/lifespan.py:287, 300 |
Router /admin/rag/* |
src/api/routers/rag_admin.py |
No hay orquestador intermedio. El agentic llama directo al RAG por HTTP. No hay un message broker, ni un proxy de retrieval, ni un servicio intermedio que "decida" qué chunks devolver. La inteligencia de retrieval está concentrada en el endpoint del RAG (embed → pgvector → top-K).
El RAG no toma decisiones de producto. No hace rerank con LLM, no genera respuestas, no resume. Es deliberado: la lógica de cómo usar los chunks vive en el agentic. Esto desacopla evoluciones del modelo generativo de evoluciones de la ingesta.
Para evolucionar el contrato: cualquier cambio en
SearchRequest / ChunkResult debe coordinarse con el repo agentic. Los
adapters HTTP del lado del agentic NO usan pydantic compartido con el RAG —
tienen su propia copia del schema. Esto evita acoplamiento de despliegue pero
significa que un breaking change requiere PRs sincronizados en ambos repos.