Parliamentary AI Monitor — Implementation Plan v3

What Changed from v2

v2 used a two-step pipeline: Whisper (ASR) → Gemini (analysis). Two models, two API calls, chunk-based every 30 seconds.

v3 replaces both with a single model: Gemini 3.1 Flash Live (released 26 March 2026), which handles Thai speech-to-text and legal analysis in one continuous WebSocket stream. Whisper is gone entirely.

[Diagram]

Why this matters:


Confirmed Requirements


The Shared Core Pipeline (v3)

All three paths run the same engine. The output step is the only difference.

[Diagram]

The one question this prototype needs to answer:

"Does Gemini 3.1 Flash Live understand Thai parliamentary speech accurately enough — and what's the fastest way to find out?"


The Shared Core Session (code used by all three paths)

# core.py — shared by all three paths
import asyncio
import sounddevice as sd
from google import genai

client = genai.Client(api_key="YOUR_GEMINI_KEY")

SYSTEM_PROMPT = """
คุณคือ Smart MP Advisor ผู้เชี่ยวชาญด้านข้อบังคับการประชุมรัฐสภาและรัฐธรรมนูญ
[paste Smart MP Advisor instructions here]

ฟังเสียงการประชุม เมื่อได้ยินคำพูดที่อาจละเมิดข้อบังคับ ให้ตอบกลับทันทีในรูปแบบ:
สถานะ: AT_RISK
ข้อบังคับที่เกี่ยวข้อง: (ระบุข้อ)
เหตุผล: (สั้นๆ)

ถ้าคำพูดปกติ ไม่ต้องตอบกลับ — ตอบเฉพาะเมื่อพบความเสี่ยงเท่านั้น
"""

LIVE_CONFIG = {
    "response_modalities": ["TEXT"],          # text output only, no audio
    "system_instruction": SYSTEM_PROMPT,
    "generation_config": {
        "thinking_config": {"thinking_level": "minimal"}  # lowest latency
    }
}

async def run_monitor(on_alert):
    """
    Opens a Gemini Live session and streams mic audio continuously.
    Calls on_alert(text) whenever AT_RISK is detected.
    Each path passes its own on_alert function.
    """
    async with client.aio.live.connect(
        model="gemini-3.1-flash-live-preview",
        config=LIVE_CONFIG
    ) as session:

        async def send_audio():
            with sd.InputStream(samplerate=16000, channels=1,
                                dtype='int16', blocksize=1024) as stream:
                while True:
                    chunk, _ = stream.read(1024)
                    await session.send_realtime_input(
                        audio={"data": chunk.tobytes(),
                               "mime_type": "audio/pcm;rate=16000"}
                    )
                    await asyncio.sleep(0)

        async def receive_text():
            async for response in session.receive():
                for part in response.server_content.model_turn.parts:
                    if part.text and "AT_RISK" in part.text:
                        await on_alert(part.text)

        await asyncio.gather(send_audio(), receive_text())

Every path below imports run_monitor and passes its own on_alert — that's the only thing that differs.


Audio Input — Two Modes, Same Session

[Diagram]

For livestream (plenary hall), replace the sd.InputStream block in send_audio() with:

import subprocess, asyncio

async def send_audio_from_livestream(session, url):
    proc = subprocess.Popen([
        "ffmpeg", "-i", url,
        "-ar", "16000", "-ac", "1",
        "-f", "s16le", "-"          # raw PCM to stdout
    ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)

    while True:
        chunk = proc.stdout.read(2048)
        if not chunk:
            break
        await session.send_realtime_input(
            audio={"data": chunk, "mime_type": "audio/pcm;rate=16000"}
        )
        await asyncio.sleep(0)

One function swap. Session config unchanged.


3 Prototype Paths

[Diagram]

Path A — Local Web App (Streamlit)

The idea: core.py + Streamlit UI running on a laptop in the committee room. No server, no deploy. One command, browser tab opens. Optionally expose to the MP's phone via ngrok.

[Diagram]

Build sequence:

[Diagram]

Stack:

Python 3.11+
├── sounddevice          # mic capture
├── google-genai         # Gemini Live API (new SDK)
└── streamlit            # UI

Path A app.py:

# app.py
import asyncio, threading
import streamlit as st
from core import run_monitor

st.set_page_config(page_title="Parliament Monitor", layout="wide")
st.title("🏛️ Parliamentary AI Monitor")

if "alerts" not in st.session_state:
    st.session_state.alerts = []

alert_placeholder = st.empty()

async def on_alert(text):
    st.session_state.alerts.insert(0, text)
    st.rerun()

def start_monitor():
    asyncio.run(run_monitor(on_alert))

# Start monitoring in background thread
if st.button("▶ Start Session"):
    t = threading.Thread(target=start_monitor, daemon=True)
    t.start()

# Display alerts
for alert in st.session_state.alerts:
    st.error(alert)

For phone access (zero code change):

# Terminal 1
streamlit run app.py

# Terminal 2
ngrok http 8501

Pros: Fastest to iterate. Full visual running log. Everything on one screen. Cons: Laptop must stay open. Phone access needs ngrok (one extra command).


Path B — LINE Bot

The idea: Same core.py. on_alert pushes a LINE message instead of updating a UI. The alert arrives on the MP's phone without any new app or URL.

[Diagram]

Build sequence:

[Diagram]

Two LINE options:

Option Setup Cost Best for
LINE Notify 5 min, personal token Free 1 recipient, fastest test
LINE Messaging API 30 min, channel setup Free tier Multiple recipients, richer format

Path B main.py:

# main.py
import asyncio, requests

LINE_NOTIFY_TOKEN = "YOUR_TOKEN"

def push_to_line(text: str):
    icon = "🔴"
    requests.post(
        "https://notify-api.line.me/api/notify",
        headers={"Authorization": f"Bearer {LINE_NOTIFY_TOKEN}"},
        data={"message": f"\n{icon} {text}"}
    )

async def on_alert(text: str):
    push_to_line(text)
    print(f"[PUSHED] {text}")

from core import run_monitor
asyncio.run(run_monitor(on_alert))

What the MP sees on their phone:

🔴 AT_RISK
──────────────────
📝 "ท่านประธานที่หน้าตาดีที่นั่งอยู่..."
⚖️ ข้อบังคับการประชุม ข้อ 45
💬 อาจเข้าข่ายการใช้ถ้อยคำเสียดสี

Pros: Fastest to a working demo. Zero setup for the MP. Most socially invisible. Cons: LINE Notify deprecated end-2025 — use Messaging API. No running log view.


Path C — n8n Workflow

The idea: core.py handles the Gemini Live session (WebSocket requires persistent Python — n8n can't do this natively). When AT_RISK is detected, it calls an n8n webhook. n8n takes over from there: formats, routes, logs.

This is the right split: Python owns the real-time session, n8n owns the delivery and logging logic.

[Diagram]

Build sequence:

[Diagram]

Path C main.py:

# main.py
import asyncio, requests

N8N_WEBHOOK_URL = "https://your-n8n-instance/webhook/parliament-alert"

async def on_alert(text: str):
    requests.post(N8N_WEBHOOK_URL, json={
        "status": "AT_RISK",
        "text": text,
        "timestamp": __import__('datetime').datetime.now().isoformat()
    })
    print(f"[WEBHOOK SENT] {text}")

from core import run_monitor
asyncio.run(run_monitor(on_alert))

n8n webhook payload example:

{
  "status": "AT_RISK",
  "text": "สถานะ: AT_RISK\nข้อบังคับที่เกี่ยวข้อง: ข้อ 45\nเหตุผล: อาจเข้าข่ายการใช้ถ้อยคำเสียดสี",
  "timestamp": "2026-03-28T10:43:12"
}

Pros: n8n owns all routing/logging — fully visible canvas, no code to maintain there. Adding Slack, email, or more recipients is one extra n8n node. Auto-logging to Google Sheets out of the box. Cons: Requires Python running alongside n8n (unavoidable — WebSocket sessions need persistent process).


Decision Matrix

Path A — Streamlit Path B — LINE Bot Path C — n8n
Build time 2–3 days 1–2 days 2–3 days
Lines of Python ~50 (app.py + core.py) ~20 (main.py + core.py) ~15 (main.py + core.py)
Infrastructure Laptop only Laptop + LINE account Laptop + n8n + LINE
MP experience Watch screen / open URL Check LINE (already open) Check LINE / email
Visual running log ✅ Yes ❌ Push per alert ❌ Push per alert
Modifiable by non-dev ❌ No ❌ No ✅ Yes (n8n canvas)
Livestream input ✅ One function swap ✅ One function swap ✅ One function swap
Multi-recipient Via ngrok URL share ✅ LINE group ✅ Native n8n nodes
Auto-logging Manual Manual ✅ Google Sheets node
Session reconnect logic Needs handling Needs handling Needs handling
Best for Demos + iteration Fastest deploy Handoff + auditability

All three paths share core.py exactly. Switching between paths is swapping on_alert — 10 lines, not a rewrite.


Recommendation

[Diagram]

Given confirmed answers — MP checks phone, n8n already running, no live dashboard needed:

Path A earns its place only if a visual running log turns out to matter after the first real test.


What "Done" Looks Like

The prototype is done when all four are true:

  1. Real Thai speech from a real microphone streams into Gemini 3.1 Flash Live
  2. Gemini returns a text analysis using the Smart MP Advisor system prompt
  3. AT_RISK result triggers an alert that reaches the MP within 15 seconds of speaking
  4. A real person in a real committee session says: "มันทำงานได้จริง"

Everything beyond that is the next sprint.