A mixin-based, self-populating cache that stops the voice agent from re-synthesizing the same phrases on every call.
| Status | Design under validation |
| Prototypes | fw-aisha cache → /experiments/tts-cache-mixin/ · worker → /experiments/tts-writeback-lambda/ |
| Companion docs | tts-cache-redesign-decisions.md (D1–D35, O1–O5) · pipecat-tts-service-hierarchy.md · tts-cache-observer-validation.md |
| Audience | Engineers reviewing/implementing the cache |
run_ttsIn one line: intercept synthesis at
run_tts, serve repeated sentences from cloud storage instead of the TTS provider, and let a background worker fill the cache from real traffic.
A large fraction of what the bot says is the same lines over and over — the opening, scripted FAQ answers, closings. We cache the synthesized audio for those.
run_tts).Three properties make it clean: generic (one implementation for every provider), sentence-level (so it catches the static lines the LLM speaks, not just injected ones), and the expensive work lives off the bot in the worker.
| ✅ Goals | 🚫 Non-goals (separate workstreams) |
|---|---|
| Cache repeated phrases, including static lines the LLM emits | The GCS storage/access model |
| Self-populating from real traffic — no pre-seeding | The writeback worker (Lambda) internals |
| Generic across providers; adding one is trivial | Any global telephony loudness/headroom fix |
| Keep synthesis + storage off the realtime bot | |
| Cached audio indistinguishable from live (quality, latency, barge-in) |
Three facts make the rest of the design obvious.
The LLM streams token chunks (LLMTextFrame); it does not form sentences.
The base TTSService does, via a SimpleTextAggregator, and calls
run_tts(sentence, context_id) once per sentence.
The key hook:
run_ttsreceives a complete sentence — exactly the unit that maps to one cached clip. (Boundaries use NLTK for Latin scripts + a punctuation fallback for Devanagari/CJK. Mode is configurable —SENTENCEdefault,TOKENopt-in; we require SENTENCE, see §6.6.)
The Word*/AudioContext* variants are deprecated; that machinery is now in the
base. Providers subclass one of three live types:
| Base | Examples | How audio leaves run_tts |
|---|---|---|
TTSService (HTTP) |
OpenAI, Kokoro, Piper, ElevenLabsHttp | run_tts yields the audio frames |
WebsocketTTSService |
ElevenLabs, Cartesia, Deepgram, Rime | run_tts sends text + yields None; audio arrives on a background receive task |
InterruptibleTTSService |
Lmnt, Fish, Smallest | like Websocket + reconnect-on-interrupt |
fw-aisha uses ElevenLabs and Cartesia — both
WebsocketTTSService. The "WS yieldsNone, audio comes async" fact drives several choices below.
Because WS audio arrives out of band (and a turn can have several sentences in
flight), the service uses an audio context: a context_id-keyed lane that
collects a request's audio (yielded or pushed), plays lanes in order, and is
cancelled on barge-in. run_tts opens the lane; frames append to it; the lane drains
to the transport. We integrate with this on the hit path (§6.5).
The bot only reads the cache and decides what to enqueue. All generation and storage happen in the worker. The cache populates as real calls hit the same sentences repeatedly.
run_ttsWe weighed three places to put the cache:
| Approach | Catches LLM-spoken lines? | Pre-seed? | Streaming kept? | Complexity |
|---|---|---|---|---|
Pre-TTS observer on TTSSpeakFrame |
✗ injected only | yes | n/a | low but limited |
| Pre-TTS observer re-aggregating sentences | ✓ but fragile (re-implements NLTK + must match keys) | no | yes | high + drift risk |
Mixin at run_tts ✅ |
✓ (post-aggregation sentence, free) | no | yes | low |
Deciding factor: the high-value content (scripted answers) is spoken by the LLM
as streamed tokens, and only becomes a clean cacheable unit after the TTS service
aggregates it — inside the service. The mixin sits exactly there, so it gets the
right unit with zero re-implementation. (Full analysis: tts-cache-observer-validation.md.)
MRO of CachedElevenLabsTTS:
CachedElevenLabsTTS → TTSCacheMixin → ElevenLabsTTSService → WebsocketTTSService → TTSService
• tts.run_tts(...) → TTSCacheMixin.run_tts (cache decides)
• super().run_tts(...) → ElevenLabsTTSService.run_tts (provider's real synth)
By MRO, tts.run_tts resolves to the mixin first; it calls super().run_tts only on
a miss. The provider class becomes a near-empty identity class. No new pipeline
node — tts_initializer just builds the cached class in the existing TTS slot.
run_tts — the decision treeasync def run_tts(self, text: str, context_id: str):
# (0) agent-level disable → behave exactly like the plain provider
if not self._caching_enabled:
async for frame in super().run_tts(text, context_id):
yield frame
return
if not getattr(self, "_processing_text", True): # barge-in already happened
return
key = self._cache_key(text) # see §6.3
record = await self._cache_backend.get(key) # Redis GET (tiny JSON pointer)
if record and record.sample_rate == self.sample_rate:
# (1) HIT → stream pre-generated audio, never call the provider
yield TTSStartedFrame(context_id=context_id)
async for pcm, sr, ch in self._cache_backend.stream_audio(key): # HTTP GET from GCS
if not getattr(self, "_processing_text", True):
break # interruptible mid-stream
yield TTSAudioRawFrame(audio=pcm, sample_rate=sr, num_channels=ch)
yield TTSStoppedFrame(context_id=context_id)
return
# (2) MISS → admit/enqueue off the audio path, then synthesize live
self._spawn(self._admit_and_maybe_enqueue(key, text))
async for frame in super().run_tts(text, context_id): # provider's native synth
yield frame
Why it reads this way
_processing_text each chunk so barge-in stops playback instantly.Governing rule: the key must capture everything that changes the audio. Two clips that share a key must be interchangeable. Missing an audio-affecting parameter is a correctness bug (you serve a clip that sounds different from what was requested); including an irrelevant one only costs a little hit-rate.
def generate_cache_key(text, *, provider, voice_id, model_id, sample_rate,
num_channels=1, encoding="pcm_s16le", settings=None):
sig = _settings_signature(settings or {}) # audio-affecting voice/generation params
raw = f"{provider}:{model_id}:{voice_id}:{sample_rate}:{num_channels}:{encoding}:{sig}:{normalize_for_cache(text)}"
return hashlib.sha256(raw.encode()).hexdigest()
# Only parameters that change the SOUND — never api keys / urls / timeouts / transport flags.
_AUDIO_KEYS = {"speed", "stability", "similarity_boost", "style", "use_speaker_boost",
"language", "controls", "emotion", "pitch", "pronunciation_dict_id"}
def _settings_signature(s: dict) -> str:
sig = {k: (round(v, 3) if isinstance(v, float) else v)
for k, v in sorted(s.items()) if k in _AUDIO_KEYS and s.get(k) is not None}
return hashlib.sha256(json.dumps(sig, sort_keys=True).encode()).hexdigest()[:12]
Include the full audio contract — provider, model_id, voice_id,
sample_rate, num_channels, encoding — plus a canonical signature of the
audio-affecting voice/generation settings (speed, stability,
similarity_boost, style, pitch, language, controls/emotion, pronunciation
dict). Exclude non-audio/sensitive fields (API keys, URLs, timeouts, transport
flags). Two rejected extremes:
speed,
stability, etc. vary independently of the voice: same key, different sound.The signature is canonical (sorted keys, floats rounded, None/defaults dropped)
so representation quirks don't fragment it, and is computed from the effective
settings actually used for synthesis (after the message > env > defaults merge) —
the same settings shipped to the worker, so the stored audio matches the key. Most of
these are constant per agent, so including them costs ~nothing and makes the cache
self-correcting if anyone tunes a setting later.
Text normalization — collapse trivial differences, keep ones that change the sound:
? and ! — question/exclamation change intonation.., Hindi danda ।, commas, etc. (declarative / LLM jitter).1,35,000, 1.35, ₹1,35,00,000) — spoken exactly as written.\w. The old [^\w\s] stripped Hindi vowel signs (ठीक है→ठ क ह) —
a real bug in the current code.The key is computed only in fw-aisha and shipped to the worker in the SQS message — the worker never recomputes it (no cross-language drift) and synthesizes from the real text + real settings, never the normalized key. (Spec:
tts-cache-observer-validation.md.)
async def _admit_and_maybe_enqueue(self, key, text):
redis = await self._cache_backend._get_redis()
seen = await redis.incr(f"voice_agent:admit:{ns}:{key}") # misses for this exact text
if seen < self._admission_threshold: # not popular enough yet
return
if not await redis.set(f"voice_agent:enqueued:{ns}:{key}", "1", nx=True, ex=ttl):
return # already enqueued — no SQS spam
if await self._cache_backend.exists(key):
return # already cached
await SQSService().send_to_sqs("aisha-tts-writeback", {
"cache_key": key, "text": text, "voice_id": ..., "model_id": ...,
"sample_rate": self.sample_rate, "namespace": ns,
})
We don't cache a sentence the first time (it might be a one-off). We count misses
per exact sentence and enqueue once it crosses a threshold (recommend 2–3). The
enqueued NX marker guarantees we enqueue exactly once while the worker is busy.
All fire-and-forget, so a miss is never delayed.
Hit-path frames are routed through the audio-context lane keyed by context_id. To be
robust across all base types, open the lane explicitly and respect the service's
start/stop-frame conventions instead of assuming frames flow straight through:
await self.create_audio_context(context_id) # open the lane (base API)
# ... yield TTSStarted (only if the base isn't already doing it) + audio + TTSStopped
This is the main thing to verify when generalizing beyond the two tested providers.
The cache keys on whatever unit run_tts receives — a sentence in SENTENCE
mode (good). In TOKEN mode it'd be individual tokens → useless keys. So the mixin
disables itself in TOKEN mode:
if str(getattr(self, "_text_aggregation_mode", "")) == "token":
self._caching_enabled = False
logger.warning(f"[tts-cache] {self.PROVIDER}: TOKEN mode — caching disabled")
(No built-in "full response" mode exists; a custom aggregator producing one would still work — the mixin just keys on the larger unit.)
The mixin is already base-agnostic. The only per-provider variability is extracting voice/model and the constructor kwargs. Generate the cached classes instead of hand-writing one per provider:
def make_cached(provider_cls, provider_name, *, voice_attr="voice", model_attr="model"):
class _Cached(TTSCacheMixin, provider_cls):
PROVIDER = provider_name
def __init__(self, *a, cache_enabled=True, **kw):
super().__init__(*a, **kw)
s = getattr(self, "_settings", None) or kw.get("settings")
self._init_cache(voice_id=getattr(s, voice_attr, ""),
model_id=getattr(s, model_attr, ""),
cache_enabled=cache_enabled)
_Cached.__name__ = f"Cached{provider_cls.__name__}"
return _Cached
CACHEABLE = {
"elevenlabs": make_cached(ElevenLabsTTSService, "elevenlabs"),
"cartesia": make_cached(CartesiaTTSService, "cartesia"),
# add a provider = one line
}
Constructor kwargs differ per provider and have no universal signature, so
tts_initializer keeps the per-provider construction it already has and just wraps the
chosen class. Adding a provider = one registry line + its existing kwargs. Validate
by base type (one HTTP, one WS, one Interruptible), not by all 40 providers.
If the GCS fetch fails (missing blob, 403, timeout, empty body), the cache must not play silence — it must fall back to live synthesis. The key is when the failure happens:
Two changes make this work: (1) the read backend must raise on non-2xx/empty
instead of silently returning nothing (today it swallows → silence); (2) the hit path
peeks the first chunk before committing (before TTSStartedFrame):
if record is not None:
stream = self._cache_backend.stream_audio(key) # raises CacheReadError on non-2xx/empty
try:
first = await anext(stream) # validates the fetch opened OK (also the TTFA sample)
except (StopAsyncIteration, CacheReadError) as e:
log_cache_event("hit_fetch_fail", key, ns, note=str(e))
await self._cache_backend.delete(key) # drop the dangling pointer
record = None # ↓ fall through to the MISS path
else:
yield TTSStartedFrame(context_id=context_id)
yield TTSAudioRawFrame(*first)
try:
async for pcm, sr, ch in stream:
if not getattr(self, "_processing_text", True): break
yield TTSAudioRawFrame(audio=pcm, sample_rate=sr, num_channels=ch)
except CacheReadError:
log_cache_event("hit_stream_drop", key, ns) # mid-stream drop → end gracefully
yield TTSStoppedFrame(context_id=context_id)
return
# record is None here on a failed hit → normal miss path (live synth + admission → regenerate)
The failed hit lands in the normal miss path, so it self-heals: live synth now, admission counter climbs, the worker regenerates the missing blob and re-registers it. Deleting the pointer means the next call is a clean miss, not another failed fetch. With permanent audio (D35) this is rare — it's the safety net for a dead URL (the brief window before a just-registered blob is readable, an accidental deletion, or a transient GCS/auth blip), turning it into graceful degradation instead of silence.
The stage latencies decompose TTFB, which is what actually matters:
TTFB (first audio) = LLM first-token
+ time to aggregate 1st sentence
+ Redis pointer fetch
+ [ HIT: GCS time-to-first-chunk | MISS: provider synth TTFA ]
Hit/Miss is measured per sentence, reported per call. A hit/miss happens per
run_tts (per sentence); a call is usually mixed, so per-sentence is the source of
truth for hit rate, and the per-call summary is the rollup. Weight the rate by
characters (a cached long sentence saves more than a short one), so put chars= on
the per-sentence line. Also tag the first sentence of each turn — it dominates
perceived TTFB. GCS metric = time-to-first-chunk (TTFA), not total download (audio
plays as it streams); the "peek first chunk" above is that sample.
| Metric | Granularity | Where | Status |
|---|---|---|---|
| Time to aggregate 1st sentence | per turn | base start/stop_text_aggregation_metrics |
via MetricsLogger |
Redis pointer fetch (redis_ms) |
per sentence | mixin | exists |
GCS first-chunk (gcs_ttfa_ms) |
per sentence (hit) | mixin hit path | add |
Hit/Miss (+ chars) |
per sentence | mixin | add chars |
| Provider synth TTFA (miss) | per sentence (miss) | provider TTFB metric | exists |
| TTS audio bytes/duration | per turn + call | TTSResponseSizeLogger |
exists |
| TTFB (first audio) | per turn | TTFBCapture (hit & miss) |
exists |
| Call rollup | per call | teardown | extend TTS_CACHE_SUMMARY |
Consistent, greppable lines (no emojis), all in logger.py:
# per sentence (one per run_tts)
TTS_CACHE event=hit key=3f9a1c2b ns=elevenlabs turn=7 sent=1 chars=34 redis_ms=1.4 gcs_ttfa_ms=88.0
TTS_CACHE event=miss key=7bce04a1 ns=elevenlabs turn=7 sent=2 chars=52 redis_ms=1.2 counter=1
# per turn (TTFB attributed to its stages)
TTS_TURN turn=7 agg_ms=210 ttfb_ms=140 first_sentence=hit audio_bytes=96000
# per call (rollup at teardown)
TTS_CACHE_SUMMARY call_sid=abc hits=9 misses=3 rate=75.0% chars_hit=812 chars_total=1180 \
char_rate=68.8% audio_bytes=1.9M avg_ttfb_hit_ms=135 avg_ttfb_miss_ms=430
char_rate is the cost-relevant hit rate; avg_ttfb_hit_ms vs avg_ttfb_miss_ms
shows the cache's latency win.
tts_cache_backend, renamed from gcp_url_backend, D33): Redis
holds a tiny JSON pointer per key; audio lives in GCS. A hit = Redis GET + an
HTTP GET of the audio URL (no GCS SDK on the read path). The GCS write path is
consolidated into the existing app/services/gcs.py — one credential source (D32).aisha-tts-writeback one message at a time;
synthesizes (multi-provider, dynamic settings), uploads raw audio to GCS, then
calls the register endpoint. It never touches Redis — avoiding the cross-cloud
AWS↔GCP-Redis problem; only fw-aisha (next to Redis) writes the pointer.Premise: audio blobs are kept permanently in GCS. A clip is ~96 KB (3 s @ 16 kHz s16le), so ~10k phrases ≈ ~1 GB ≈ ~$0.02/month — negligible, and read egress is the same whether we retain or delete.
Keeping audio forever removes a whole class of complexity:
evict_cache.py, the voice_agent:freq sorted set, the
per-hit ZINCRBY, and the scheduler all disappear.The only store to bound is Redis (shared with other use-cases), via a TTL on the
pointer keys — no sweeper, and no maxmemory/allkeys-* policy (which would put
other use-cases at risk). Two variants:
/register writes SET <key> <json> EX 43200; the pointer lives 12h
from creation regardless of use. On recurrence after expiry → miss → re-admit →
re-enqueue → worker re-synthesizes and idempotently re-uploads the same blob →
pointer recreated with a fresh TTL.EX at register; nothing on the hot path.EXPIRE <key> 43200 on every hit.
A pointer only expires after 12h idle (TTL-LRU); hot phrases stay cached.EXPIRE per hit (off the audio path). No
re-warm churn for active phrases; only truly-idle-then-revived phrases re-synth.| Fixed 12h | Sliding 12h | |
|---|---|---|
| Pointer lifetime | 12h from creation | 12h from last hit |
| Hot phrases | regenerate ~every 12h | stay cached while used |
| Hot-path cost | none (GET only) |
+1 EXPIRE per hit |
| Re-warm churn | yes, periodic | none for active phrases |
| Redis footprint | 12h-since-creation window | 12h active working set |
| Complexity | lowest | low |
Recommendation: start with fixed (simplest, zero hot-path cost); move to sliding if measurement shows hot phrases regenerating often enough to matter. Optional (helps fixed): on a miss, recreate the pointer directly if the GCS blob already exists — skips the re-warm synth for one GCS existence check.
The original design tracked frequency in a Redis sorted set (ZINCRBY per hit) and ran
evict_cache.py to delete least-frequent entries with score decay. Dropped because:
it needed an out-of-band scheduler that was never wired up (cache just grew); it
added a hot-path write + custom two-store deletion; Redis's built-in
allkeys-lfu can't substitute on a shared Redis (instance-global, can't touch
GCS); and it's over-engineered versus a TTL given audio is cheap and permanent.
run_tts, not a separate processor — gets the post-aggregation sentence for free; the only way to cache LLM-spoken lines without re-implementing aggregation. (§5)?/!) and numbers, collapse jitter, fix the Devanagari \w bug. (§6.3)chars-weighted) + per-turn TTFB/aggregation + per-call rollup; GCS metric = time-to-first-chunk; all in logger.py. (§6.9, D38)Done — prototyped (fw-aisha untouched):
/experiments/tts-cache-mixin/ — tested normalizer, generic TTSCacheMixin
(hit/miss/admission/enqueue + TOKEN guard), thin provider subclasses, logging
consolidation, migration notes./experiments/tts-writeback-lambda/ — Node worker (multi-provider, dynamic
settings, one-message-at-a-time) + local test harness.Open / excluded:
| Item | Status |
|---|---|
| O1 — private-bucket read auth (cached SA token) | open, fw-aisha read-path change |
| O2 — admission threshold value (2–3) | open |
| O3 — worker placement (AWS Lambda vs GCP Cloud Run) | open |
| superseded by D35; sub-choice: fixed vs sliding TTL | |
| O5 — global telephony headroom | open, only if QA confirms clipping |
D32/D33 — fold GCS writes into gcs.py; slim + rename backend |
planned |
/tts-cache/register endpoint on fw-aisha |
to build (worker's counterpart) |
Until the worker + register endpoint are live, every call is a MISS — by design. That validates the read/admission half safely: behaviour equals today's non-cached path plus the SQS enqueue.
TTSService), one WS (EL/Cartesia),
one InterruptibleTTSService: confirm hit playback, miss synthesis, interruption,
history/turn tracking.<spell> handling; key includes the settings
signature so a tuned setting produces a new key.TTS_CACHE_SUMMARY and project savings at our call volume.