Skip to content

Reference

Auto-generated from Python docstrings for the orateur package.

orateur

Orateur - minimal local speech-to-text and speech-to-speech.

audio_capture

Audio capture for speech recognition.

AudioCapture(device_id: Optional[int] = None, config=None)

Handles audio recording for STT.

Source code in src/orateur/audio_capture.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __init__(self, device_id: Optional[int] = None, config=None):
    self.sample_rate = 16000
    self.channels = 1
    self.chunk_size = 1024
    self.dtype = np.float32

    self.preferred_device_id = device_id or (config.get_setting("audio_device_id") if config else None)
    self.config = config

    self.is_recording = False
    self.audio_data = []
    self.lock = threading.Lock()
    self.stream = None
    self.record_thread = None
    self._level_callback = None

    self._init_device()
start_recording(level_callback=None) -> bool

Start recording. level_callback(chunk_rms: float) is called per chunk if provided.

Source code in src/orateur/audio_capture.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def start_recording(self, level_callback=None) -> bool:
    """Start recording. level_callback(chunk_rms: float) is called per chunk if provided."""
    if self.is_recording:
        return True
    try:
        with self.lock:
            self.audio_data = []
            self._level_callback = level_callback
            self.is_recording = True
        self.record_thread = threading.Thread(target=self._record_audio, daemon=True)
        self.record_thread.start()
        return True
    except Exception as e:
        log.error("Failed to start: %s", e)
        with self.lock:
            self.is_recording = False
        return False

audio_utils

Audio utility functions for waveform visualization.

audio_to_levels(audio: np.ndarray, num_bars: int = 60) -> list[float]

Split audio into segments and return RMS per segment normalized to 0-1.

Args: audio: 1D float32 audio array num_bars: Number of bars/levels to return

Returns: List of floats in [0, 1] representing normalized RMS per segment

Source code in src/orateur/audio_utils.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def audio_to_levels(audio: np.ndarray, num_bars: int = 60) -> list[float]:
    """
    Split audio into segments and return RMS per segment normalized to 0-1.

    Args:
        audio: 1D float32 audio array
        num_bars: Number of bars/levels to return

    Returns:
        List of floats in [0, 1] representing normalized RMS per segment
    """
    if audio is None or len(audio) == 0:
        return [0.0] * num_bars

    audio = np.asarray(audio, dtype=np.float32)
    if audio.ndim > 1:
        audio = audio.flatten()

    n = len(audio)
    if n < num_bars:
        # Pad with zeros
        levels = np.zeros(num_bars, dtype=np.float32)
        seg_len = max(1, n // num_bars)
        for i in range(min(num_bars, (n + seg_len - 1) // seg_len)):
            start = i * seg_len
            end = min(start + seg_len, n)
            seg = audio[start:end]
            levels[i] = float(np.sqrt(np.mean(seg**2)))
    else:
        seg_len = n // num_bars
        levels = np.zeros(num_bars, dtype=np.float32)
        for i in range(num_bars):
            start = i * seg_len
            end = (i + 1) * seg_len if i < num_bars - 1 else n
            seg = audio[start:end]
            if len(seg) > 0:
                levels[i] = float(np.sqrt(np.mean(seg**2)))

    # Normalize to 0-1 (clip and scale by max)
    max_val = float(np.max(levels))
    if max_val > 0:
        levels = np.clip(levels / max_val, 0.0, 1.0)
    else:
        levels = np.zeros(num_bars, dtype=np.float32)

    return levels.tolist()

cli

Orateur CLI.

cmd_run(args)

Run main loop.

Source code in src/orateur/cli.py
36
37
38
def cmd_run(args):
    """Run main loop."""
    run()

cmd_setup(args)

Install GPU-accelerated pywhispercpp (CUDA, Metal, or PyPI CPU).

Source code in src/orateur/cli.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def cmd_setup(args):
    """Install GPU-accelerated pywhispercpp (CUDA, Metal, or PyPI CPU)."""
    from .install_quickshell import install_quickshell
    from .install_stt import (
        _build_pywhispercpp_cuda_from_source,
        _build_pywhispercpp_metal_from_source,
        _is_apple_silicon,
        _is_linux_x86_64,
        download_whisper_model,
        install_pywhispercpp,
    )

    force = getattr(args, "force", False)
    if getattr(args, "build_from_source", False):
        if _is_apple_silicon():
            ok = _build_pywhispercpp_metal_from_source(force=force)
        elif _is_linux_x86_64():
            ok = _build_pywhispercpp_cuda_from_source(force=force)
        else:
            logger.error("--build-from-source is supported on Linux x86_64 (CUDA) or macOS Apple Silicon (Metal)")
            return 1
        if not ok:
            return 1
        config = ConfigManager()
        if not download_whisper_model(config.get_setting("stt_model", "base")):
            logger.warning("Whisper model download failed; first run will try again (needs network)")
        install_quickshell()
        return 0

    backend = getattr(args, "backend", "auto")
    if backend == "auto":
        backend = None
    ok = install_pywhispercpp(backend=backend, force=force)
    if ok:
        config = ConfigManager()
        if not download_whisper_model(config.get_setting("stt_model", "base")):
            logger.warning("Whisper model download failed; first run will try again (needs network)")
    install_quickshell()
    return 0 if ok else 1

cmd_speak(args)

TTS from arg, clipboard, or selection.

Source code in src/orateur/cli.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def cmd_speak(args):
    """TTS from arg, clipboard, or selection."""
    config = ConfigManager()
    text = args.text
    if not text:
        text = _get_text_from_selection(config)
    if not text:
        logger.error("No text to speak")
        return 1
    tts = get_tts_backend(config.get_setting("tts_backend", "pocket_tts"), config)
    if not tts or not tts.is_ready():
        logger.error("TTS not ready")
        return 1
    tts.synthesize_and_play(text)
    return 0

cmd_sts(args)

Speech-to-Speech: record -> STT -> LLM -> TTS.

Source code in src/orateur/cli.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def cmd_sts(args):
    """Speech-to-Speech: record -> STT -> LLM -> TTS."""
    config = ConfigManager()
    stt = get_stt_backend(config.get_setting("stt_backend", "pywhispercpp"), config)
    tts = get_tts_backend(config.get_setting("tts_backend", "pocket_tts"), config)
    llm_name = config.get_setting("llm_backend", "ollama")
    if is_llm_disabled(llm_name):
        logger.error("STS needs an LLM; set llm_backend to ollama (currently %s)", llm_name)
        return 1
    llm = get_llm_backend(llm_name, config)
    if not all([stt and stt.is_ready(), tts and tts.is_ready(), llm and llm.is_ready()]):
        logger.error("STT, TTS, or LLM not ready")
        return 1
    audio = AudioCapture(config=config)
    logger.info("Recording for STS... (Ctrl+C to stop)")
    try:
        audio.start_recording()
        while True:
            import time

            time.sleep(0.5)
    except KeyboardInterrupt:
        pass
    logger.info("Stopping...")
    result: list = []

    def _run_stop():
        try:
            result.append(audio.stop_recording())
        except Exception as e:
            result.append(e)

    worker = threading.Thread(target=_run_stop, daemon=True)
    worker.start()
    try:
        while worker.is_alive():
            worker.join(timeout=0.2)
    except KeyboardInterrupt:
        logger.info("Stopping... (please wait)")
        worker.join(timeout=5.0)
    data = result[0] if result else None
    if isinstance(data, BaseException):
        raise data
    if data is None:
        logger.error("No audio")
        return 1
    run_sts(config, data, stt=stt, tts=tts, llm=llm)
    return 0

cmd_transcribe(args)

Record and transcribe only.

Source code in src/orateur/cli.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def cmd_transcribe(args):
    """Record and transcribe only."""
    config = ConfigManager()
    stt = get_stt_backend(config.get_setting("stt_backend", "pywhispercpp"), config)
    if not stt or not stt.is_ready():
        logger.error("STT not ready")
        return 1
    audio = AudioCapture(config=config)
    logger.info("Recording... (Ctrl+C to stop)")
    try:
        audio.start_recording()
        while True:
            import time

            time.sleep(0.5)
    except KeyboardInterrupt:
        pass
    logger.info("Stopping...")
    # Run stop_recording in a thread so main thread stays responsive to Ctrl+C
    result: list = []

    def _run_stop():
        try:
            result.append(audio.stop_recording())
        except Exception as e:
            result.append(e)

    worker = threading.Thread(target=_run_stop, daemon=True)
    worker.start()
    try:
        while worker.is_alive():
            worker.join(timeout=0.2)
    except KeyboardInterrupt:
        logger.info("Stopping... (please wait)")
        worker.join(timeout=5.0)
    data = result[0] if result else None
    if isinstance(data, BaseException):
        raise data
    if data is None:
        logger.error("No audio")
        return 1
    logger.info("Transcribing...")
    text = stt.transcribe(data)
    if not text or not text.strip():
        logger.error("No transcription")
        return 1
    injector = TextInjector(config)
    if not injector.inject_text(text):
        logger.warning("Could not paste - text copied to clipboard")
    print(text)
    return 0

cmd_ui(args)

Run UI daemon for Quickshell (FIFO commands, JSON events on stdout).

Source code in src/orateur/cli.py
285
286
287
288
289
290
def cmd_ui(args):
    """Run UI daemon for Quickshell (FIFO commands, JSON events on stdout)."""
    from .ui_daemon import _run_ui_daemon

    _run_ui_daemon(events_only=getattr(args, "events_only", False))
    return 0

cmd_ui_send(args)

Send a JSON command to the UI daemon (reads from stdin or first arg).

Source code in src/orateur/cli.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def cmd_ui_send(args):
    """Send a JSON command to the UI daemon (reads from stdin or first arg)."""
    import sys as _sys

    from .paths import CACHE_DIR, CMD_FIFO

    data = getattr(args, "json_data", None) or ""
    if not data:
        data = _sys.stdin.read().strip()
    if not data:
        logger.error("No JSON data (pass as arg or stdin)")
        return 1
    CACHE_DIR.mkdir(parents=True, exist_ok=True)
    if not CMD_FIFO.exists():
        logger.error("UI daemon not running (FIFO not found). Run 'orateur ui' first.")
        return 1
    try:
        with open(CMD_FIFO, "w", encoding="utf-8") as f:
            f.write(data)
            if not data.endswith("\n"):
                f.write("\n")
    except OSError as e:
        logger.error("Failed to write to FIFO: %s", e)
        return 1
    return 0

config

Configuration manager for Orateur.

ConfigManager()

Manages application configuration and settings.

Source code in src/orateur/config.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def __init__(self):
    self.default_config = {
        "primary_shortcut": "SUPER+ALT+D",
        "secondary_shortcut": "SUPER+ALT+E",
        "tts_shortcut": "SUPER+ALT+T",
        "sts_shortcut": "SUPER+ALT+S",
        # Desktop app (Tauri) global shortcut to restart `orateur run`; same format as other shortcuts.
        "restart_daemon_shortcut": "SUPER+ALT+R",
        "recording_mode": "toggle",
        "grab_keys": False,
        "selected_device_path": None,
        "selected_device_name": None,
        "audio_device_id": None,
        "audio_device_name": None,
        "stt_backend": "pywhispercpp",
        "stt_model": "base",
        "stt_language": None,
        "stt_language_secondary": None,
        "stt_threads": 4,
        "stt_whisper_prompt": "Transcribe with proper capitalization.",
        "stt_whisper_prompt_secondary": None,
        "stt_whisper_verbose": False,
        "tts_backend": "pocket_tts",
        "tts_voice": "alba",
        "tts_volume": 1.0,
        # Use "none", "off", or "disabled" to skip Ollama (speech-to-speech unavailable).
        "llm_backend": "ollama",
        "llm_model": "llama3.2",
        "llm_system_prompt": "You are a helpful assistant. Respond concisely.",
        "llm_base_url": "http://localhost:11434",
        "mcpServers": {},
        "mcp_tools_url": None,
        "paste_mode": "ctrl_shift",
        "paste_keycode": 47,
        # Append UI events to ~/.cache/orateur/ui_events.jsonl (Quickshell, Tauri desktop, tail, etc.).
        "ui_events_mirror": True,
        # Spawn `quickshell -c orateur` when `orateur run` starts (e.g. systemd).
        "quickshell_autostart": False,
        # notify-send when `orateur run` is ready / on shutdown (set false for headless).
        "desktop_notifications": True,
    }

    self.config_dir = CONFIG_DIR
    self.config_file = CONFIG_FILE
    self.config = copy.deepcopy(self.default_config)
    self._ensure_config_dir()
    self._load_config()
get_setting(key: str, default: Any = None) -> Any

Get a configuration setting.

Source code in src/orateur/config.py
 99
100
101
def get_setting(self, key: str, default: Any = None) -> Any:
    """Get a configuration setting."""
    return self.config.get(key, default)
get_temp_directory() -> Path

Get the temporary directory for audio files.

Source code in src/orateur/config.py
107
108
109
110
111
112
def get_temp_directory(self) -> Path:
    """Get the temporary directory for audio files."""
    from .paths import TEMP_DIR

    TEMP_DIR.mkdir(parents=True, exist_ok=True)
    return TEMP_DIR
save_config() -> bool

Save current configuration to file.

Source code in src/orateur/config.py
89
90
91
92
93
94
95
96
97
def save_config(self) -> bool:
    """Save current configuration to file."""
    try:
        with open(self.config_file, "w", encoding="utf-8") as f:
            json.dump(self.config, f, indent=2)
        return True
    except Exception as e:
        log.error("Could not save config: %s", e)
        return False
set_setting(key: str, value: Any) -> None

Set a configuration setting.

Source code in src/orateur/config.py
103
104
105
def set_setting(self, key: str, value: Any) -> None:
    """Set a configuration setting."""
    self.config[key] = value

desktop_notify

Best-effort desktop notifications (Linux: notify-send; macOS: AppleScript).

notify(summary: str, body: str = '', *, urgency: str = 'normal') -> None

Send a desktop notification when supported on this OS. Never raises.

Source code in src/orateur/desktop_notify.py
78
79
80
81
82
83
84
85
86
87
88
def notify(summary: str, body: str = "", *, urgency: str = "normal") -> None:
    """Send a desktop notification when supported on this OS. Never raises."""
    if os.environ.get("ORATEUR_NO_NOTIFY", "").strip() in ("1", "true", "yes"):
        return
    summary = _truncate(summary.strip() or _APP_NAME, 200)
    body = _truncate(body.strip(), 400) if body else ""

    if sys.platform == "darwin":
        _notify_macos(summary, body, urgency=urgency)
    else:
        _notify_linux(summary, body, urgency=urgency)

install_quickshell

Install Quickshell Orateur component when Quickshell is detected.

install_quickshell() -> bool

Install Orateur Quickshell component to ~/.config/quickshell/orateur/.

Detects Quickshell; if present, copies or symlinks quickshell/orateur/ from the repo to the config directory. Prefers symlink when ORATEUR_ROOT is set (development/editable install).

Source code in src/orateur/install_quickshell.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def install_quickshell() -> bool:
    """
    Install Orateur Quickshell component to ~/.config/quickshell/orateur/.

    Detects Quickshell; if present, copies or symlinks quickshell/orateur/
    from the repo to the config directory. Prefers symlink when ORATEUR_ROOT
    is set (development/editable install).
    """
    if not _detect_quickshell():
        log.debug("Quickshell not detected, skipping Quickshell install")
        return False

    project_root = _project_root()
    src = project_root / "quickshell" / "orateur"
    if not src.exists():
        log.warning("Quickshell source not found: %s", src)
        return False

    dest = ORATEUR_QUICKSHELL_DEST
    dest.parent.mkdir(parents=True, exist_ok=True)

    use_symlink = bool(os.environ.get("ORATEUR_ROOT"))

    if dest.exists():
        if dest.is_symlink():
            if dest.resolve() == src.resolve():
                log.info("Quickshell Orateur already installed (symlink)")
                _write_orateur_bin_path(project_root)
                return True
            dest.unlink()
        else:
            shutil.rmtree(dest)

    try:
        if use_symlink:
            dest.symlink_to(src.resolve())
            log.info("Quickshell Orateur installed (symlink) → %s", dest)
        else:
            shutil.copytree(src, dest)
            log.info("Quickshell Orateur installed (copy) → %s", dest)
        _write_orateur_bin_path(project_root)
        return True
    except OSError as e:
        log.warning("Failed to install Quickshell component: %s", e)
        return False

install_stt

Setup-time installation of pywhispercpp with GPU support.

Installs either: - Build from source (absadiki/pywhispercpp) with CUDA on Linux x86_64 when CUDA is detected - Build from source with Metal on macOS Apple Silicon (arm64) - PyPI (CPU) otherwise

When run via the launcher (installed users): installs into ~/.local/share/orateur/venv. When run via uv run (development): installs into project .venv.

download_whisper_model(model_name: Optional[str] = None) -> bool

Download ggml weights into pywhispercpp's MODELS_DIR (same layout as Model()).

Runs in a subprocess with the same Python as pip/pywhispercpp so (1) the package is visible immediately after pip install without restarting the parent process, and (2) we use the venv where STT packages were installed (active venv or fixed venv).

Source code in src/orateur/install_stt.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def download_whisper_model(model_name: Optional[str] = None) -> bool:
    """Download ggml weights into pywhispercpp's MODELS_DIR (same layout as Model()).

    Runs in a subprocess with the same Python as pip/pywhispercpp so (1) the package is
    visible immediately after pip install without restarting the parent process, and (2) we
    use the venv where STT packages were installed (active venv or fixed venv).
    """
    py = _python_for_pip_install()
    if not py:
        log.error("No Python interpreter for model download")
        return False
    name = (model_name or "base").strip()
    code = f"""import sys
name = {name!r}
from pywhispercpp.constants import AVAILABLE_MODELS
from pywhispercpp.utils import download_model
if name not in AVAILABLE_MODELS:
    print("unknown model", file=sys.stderr)
    sys.exit(2)
p = download_model(name)
print(p or "")
sys.exit(0 if p else 1)
"""
    try:
        r = subprocess.run(
            [str(py), "-c", code],
            capture_output=True,
            text=True,
            timeout=600,
        )
        if r.returncode == 2:
            log.error("Unknown Whisper model %r (see pywhispercpp.constants.AVAILABLE_MODELS)", name)
            return False
        if r.returncode != 0:
            err = (r.stderr or "") + (r.stdout or "")
            if "No module named" in err or "ModuleNotFoundError" in err:
                log.error("pywhispercpp is not installed in the target venv")
            else:
                log.error("download_model failed: %s", err[:800] if err else "unknown")
            return False
        out = (r.stdout or "").strip()
        if out:
            log.info("Whisper weights: %s", out)
        return True
    except (OSError, subprocess.TimeoutExpired) as e:
        log.error("Failed to download Whisper model %s: %s", name, e)
        return False

install_pywhispercpp(backend: Optional[str] = None, force: bool = False) -> bool

Detect GPU backend and install pywhispercpp.

backend: 'nvidia' = CUDA from source (Linux x86_64 + CUDA); 'metal' = Metal from source (macOS arm64); 'cpu' = PyPI CPU wheel; None = auto (CUDA on Linux+CUDA, Metal on Apple Silicon, else PyPI). force: If True, reinstall even when already installed.

Source code in src/orateur/install_stt.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def install_pywhispercpp(backend: Optional[str] = None, force: bool = False) -> bool:
    """Detect GPU backend and install pywhispercpp.

    backend: 'nvidia' = CUDA from source (Linux x86_64 + CUDA);
             'metal' = Metal from source (macOS arm64);
             'cpu' = PyPI CPU wheel;
             None = auto (CUDA on Linux+CUDA, Metal on Apple Silicon, else PyPI).
    force: If True, reinstall even when already installed.
    """
    if not force:
        installed, current_backend = _pywhispercpp_installed()
        if installed:
            if backend == "cpu":
                want_cuda = want_metal = False
            elif backend == "nvidia":
                want_cuda, want_metal = True, False
            elif backend == "metal":
                want_cuda, want_metal = False, True
            else:
                want_cuda = bool(_detect_cuda_version() and _is_linux_x86_64())
                want_metal = bool(_is_apple_silicon())
            if want_cuda and current_backend == "cuda":
                log.info("pywhispercpp (CUDA) already installed, skipping")
                return True
            if want_metal and current_backend == "metal":
                log.info("pywhispercpp (Metal) already installed, skipping")
                return True
            if not want_cuda and not want_metal and current_backend == "cpu":
                log.info("pywhispercpp (CPU) already installed, skipping")
                return True

    if backend == "cpu":
        return _install_from_pypi(force=force)

    if backend == "metal":
        if not _is_apple_silicon():
            log.error("Metal backend requires macOS on Apple Silicon (arm64)")
            return False
        return _build_pywhispercpp_metal_from_source(force=force)

    if backend == "nvidia":
        cuda_version = _detect_cuda_version()
        if not cuda_version:
            log.error("NVIDIA backend requested but no CUDA detected")
            return False
        if not _is_linux_x86_64():
            log.warning("CUDA build from source only supported on Linux x86_64. Using PyPI (CPU).")
            return _install_from_pypi(force=force)
        log.info("Detected CUDA %s -> building from source...", cuda_version)
        return _build_pywhispercpp_cuda_from_source(force=force)

    # Auto-detect
    if _is_apple_silicon():
        log.info("Apple Silicon detected -> building pywhispercpp with Metal from source...")
        return _build_pywhispercpp_metal_from_source(force=force)

    cuda_version = _detect_cuda_version()
    if not cuda_version:
        return _install_from_pypi(force=force)
    if not _is_linux_x86_64():
        log.warning("CUDA build only on Linux x86_64. Using PyPI (CPU).")
        return _install_from_pypi(force=force)
    log.info("Detected CUDA %s -> building from source...", cuda_version)
    return _build_pywhispercpp_cuda_from_source(force=force)

llm

LLM backends for Speech-to-Speech.

LLMBackend(config: object)

Bases: ABC

Abstract base class for LLM backends (used in STS pipeline).

Subclasses store config as needed.

Source code in src/orateur/llm/base.py
10
11
def __init__(self, config: object) -> None:
    """Subclasses store ``config`` as needed."""
generate(user_text: str, system_prompt: Optional[str] = None, model_override: Optional[str] = None) -> str abstractmethod

Generate a response from the LLM.

Args: user_text: The user's input (transcribed speech) system_prompt: Optional system prompt model_override: Optional model name override

Returns: Generated text response

Source code in src/orateur/llm/base.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@abstractmethod
def generate(
    self,
    user_text: str,
    system_prompt: Optional[str] = None,
    model_override: Optional[str] = None,
) -> str:
    """
    Generate a response from the LLM.

    Args:
        user_text: The user's input (transcribed speech)
        system_prompt: Optional system prompt
        model_override: Optional model name override

    Returns:
        Generated text response
    """
    pass
get_available_models() -> list[str]

Return list of available model names (if applicable).

Source code in src/orateur/llm/base.py
42
43
44
def get_available_models(self) -> list[str]:
    """Return list of available model names (if applicable)."""
    return []
initialize(config) -> bool abstractmethod

Initialize the backend. Returns True on success.

Source code in src/orateur/llm/base.py
13
14
15
16
@abstractmethod
def initialize(self, config) -> bool:
    """Initialize the backend. Returns True on success."""
    pass
is_ready() -> bool

Check if backend is ready.

Source code in src/orateur/llm/base.py
38
39
40
def is_ready(self) -> bool:
    """Check if backend is ready."""
    return True

get_llm_backend(name: str, config) -> Optional[LLMBackend]

Get and initialize an LLM backend by name.

Source code in src/orateur/llm/registry.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def get_llm_backend(name: str, config) -> Optional[LLMBackend]:
    """Get and initialize an LLM backend by name."""
    if is_llm_disabled(name):
        return None
    if name == "mcp":
        log.warning("llm_backend 'mcp' is deprecated, using 'ollama' instead")
        name = "ollama"
    cls = _BACKENDS.get(name)
    if cls is None:
        return None
    backend = cls(config)
    if backend.initialize(config):
        return backend
    return None

is_llm_disabled(name: str) -> bool

True when config explicitly turns off the LLM (no Ollama connection).

Source code in src/orateur/llm/registry.py
18
19
20
21
22
def is_llm_disabled(name: str) -> bool:
    """True when config explicitly turns off the LLM (no Ollama connection)."""
    if not isinstance(name, str):
        return False
    return name.strip().lower() in _DISABLED_NAMES

list_llm_backends() -> list[str]

List registered LLM backend names (includes explicit disable sentinel).

Source code in src/orateur/llm/registry.py
41
42
43
def list_llm_backends() -> list[str]:
    """List registered LLM backend names (includes explicit disable sentinel)."""
    return ["none"] + list(_BACKENDS.keys())

base

Abstract LLM backend interface.

LLMBackend(config: object)

Bases: ABC

Abstract base class for LLM backends (used in STS pipeline).

Subclasses store config as needed.

Source code in src/orateur/llm/base.py
10
11
def __init__(self, config: object) -> None:
    """Subclasses store ``config`` as needed."""
generate(user_text: str, system_prompt: Optional[str] = None, model_override: Optional[str] = None) -> str abstractmethod

Generate a response from the LLM.

Args: user_text: The user's input (transcribed speech) system_prompt: Optional system prompt model_override: Optional model name override

Returns: Generated text response

Source code in src/orateur/llm/base.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@abstractmethod
def generate(
    self,
    user_text: str,
    system_prompt: Optional[str] = None,
    model_override: Optional[str] = None,
) -> str:
    """
    Generate a response from the LLM.

    Args:
        user_text: The user's input (transcribed speech)
        system_prompt: Optional system prompt
        model_override: Optional model name override

    Returns:
        Generated text response
    """
    pass
get_available_models() -> list[str]

Return list of available model names (if applicable).

Source code in src/orateur/llm/base.py
42
43
44
def get_available_models(self) -> list[str]:
    """Return list of available model names (if applicable)."""
    return []
initialize(config) -> bool abstractmethod

Initialize the backend. Returns True on success.

Source code in src/orateur/llm/base.py
13
14
15
16
@abstractmethod
def initialize(self, config) -> bool:
    """Initialize the backend. Returns True on success."""
    pass
is_ready() -> bool

Check if backend is ready.

Source code in src/orateur/llm/base.py
38
39
40
def is_ready(self) -> bool:
    """Check if backend is ready."""
    return True

mcp_tools

MCP tools: fetch from servers, convert to OpenAI format, execute tool calls.

mcp_connections(config: Any) async

Connect to MCP servers, yield (ollama_tools, tool_to_server, call_tool). Uses: async with stdio_client(params) as (read, write), ClientSession(read, write).

Source code in src/orateur/llm/mcp_tools.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
@asynccontextmanager
async def mcp_connections(config: Any):
    """
    Connect to MCP servers, yield (ollama_tools, tool_to_server, call_tool).
    Uses: async with stdio_client(params) as (read, write), ClientSession(read, write).
    """
    from mcp import ClientSession
    from mcp.client.stdio import StdioServerParameters, stdio_client

    connections: dict[str, Any] = {}  # server_name -> session
    tool_to_server: dict[str, str] = {}

    async with AsyncExitStack() as stack:
        # Connect stdio servers
        servers = config.get_setting("mcpServers") or {}
        if isinstance(servers, dict):
            for name, cfg in servers.items():
                if not isinstance(cfg, dict):
                    continue
                cmd = cfg.get("command")
                args = cfg.get("args")
                if not cmd:
                    continue
                args = args if isinstance(args, list) else [str(a) for a in args]
                env_cfg = cfg.get("env")
                if isinstance(env_cfg, dict) and env_cfg:
                    merged_env = {
                        **os.environ,
                        **{str(k): str(v) for k, v in env_cfg.items()},
                    }
                    params = StdioServerParameters(command=str(cmd), args=args, env=merged_env)
                else:
                    params = StdioServerParameters(command=str(cmd), args=args)
                try:
                    read, write = await stack.enter_async_context(stdio_client(params))
                    session = ClientSession(read, write)
                    await stack.enter_async_context(session)
                    await session.initialize()
                    connections[name] = session
                    log.info("MCP connected to stdio server '%s'", name)
                except Exception as e:
                    log.warning("MCP stdio server '%s' failed: %s", name, e)

        # Connect SSE server
        url = config.get_setting("mcp_tools_url")
        if url and isinstance(url, str) and url.strip():
            try:
                from mcp.client.sse import sse_client

                read, write = await stack.enter_async_context(sse_client(url.strip()))
                session = ClientSession(read, write)
                await stack.enter_async_context(session)
                await session.initialize()
                connections["sse"] = session
                log.info("MCP connected to SSE server at %s", url)
            except ImportError:
                log.warning("MCP SSE client not available")
            except Exception as e:
                log.warning("MCP SSE server failed: %s", e)

        async def fetch_tools() -> tuple[list[dict], dict[str, str]]:
            ollama_tools: list[dict] = []
            for server_name, session in connections.items():
                try:
                    result = await session.list_tools()
                    tools = getattr(result, "tools", []) or []
                    for t in tools:
                        name = getattr(t, "name", "")
                        if name:
                            ollama_tools.append(_mcp_tool_to_openai(t))
                            tool_to_server[name] = server_name
                    log.info("MCP server '%s': %d tools", server_name, len(tools))
                except Exception as e:
                    log.warning("MCP list_tools failed for '%s': %s", server_name, e)
            return (ollama_tools, tool_to_server)

        async def call_tool(server_name: str, tool_name: str, arguments: dict[str, Any] | None) -> str:
            if server_name not in connections:
                return f"Error: server '{server_name}' not connected"
            session = connections[server_name]
            args = arguments if isinstance(arguments, dict) else {}
            try:
                result = await session.call_tool(tool_name, args)
                return _extract_text_from_result(result)
            except Exception as e:
                log.warning("MCP call_tool %s failed: %s", tool_name, e)
                return f"Error: {e}"

        ollama_tools, tool_to_server = await fetch_tools()
        yield (ollama_tools, tool_to_server, call_tool)

ollama

Ollama LLM backend.

OllamaBackend(config)

Bases: LLMBackend

Ollama local LLM with optional MCP tools.

Source code in src/orateur/llm/ollama.py
53
54
55
56
def __init__(self, config):
    super().__init__(config)
    self.config = config
    self.ready = False

registry

LLM backend registry.

get_llm_backend(name: str, config) -> Optional[LLMBackend]

Get and initialize an LLM backend by name.

Source code in src/orateur/llm/registry.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def get_llm_backend(name: str, config) -> Optional[LLMBackend]:
    """Get and initialize an LLM backend by name."""
    if is_llm_disabled(name):
        return None
    if name == "mcp":
        log.warning("llm_backend 'mcp' is deprecated, using 'ollama' instead")
        name = "ollama"
    cls = _BACKENDS.get(name)
    if cls is None:
        return None
    backend = cls(config)
    if backend.initialize(config):
        return backend
    return None
is_llm_disabled(name: str) -> bool

True when config explicitly turns off the LLM (no Ollama connection).

Source code in src/orateur/llm/registry.py
18
19
20
21
22
def is_llm_disabled(name: str) -> bool:
    """True when config explicitly turns off the LLM (no Ollama connection)."""
    if not isinstance(name, str):
        return False
    return name.strip().lower() in _DISABLED_NAMES
list_llm_backends() -> list[str]

List registered LLM backend names (includes explicit disable sentinel).

Source code in src/orateur/llm/registry.py
41
42
43
def list_llm_backends() -> list[str]:
    """List registered LLM backend names (includes explicit disable sentinel)."""
    return ["none"] + list(_BACKENDS.keys())
register_llm_backend(name: str, backend_cls: Type[LLMBackend]) -> None

Register a new LLM backend.

Source code in src/orateur/llm/registry.py
46
47
48
def register_llm_backend(name: str, backend_cls: Type[LLMBackend]) -> None:
    """Register a new LLM backend."""
    _BACKENDS[name] = backend_cls

log

Application-wide logging configuration.

All orateur.* log records go to stderr. When you run orateur run in a terminal, you should see them in that terminal. Set ORATEUR_LOG_LEVEL=DEBUG for more detail.

ensure_logging_configured() -> None

Ensure the orateur logger has a stderr handler.

orateur.cli calls setup_logging() first; this is a fallback when run() is entered without the CLI (tests, embedding). Safe to call after setup_logging (no-op).

Source code in src/orateur/log.py
46
47
48
49
50
51
52
53
54
def ensure_logging_configured() -> None:
    """Ensure the ``orateur`` logger has a stderr handler.

    ``orateur.cli`` calls ``setup_logging()`` first; this is a fallback when ``run()`` is
    entered without the CLI (tests, embedding). Safe to call after ``setup_logging`` (no-op).
    """
    lg = logging.getLogger("orateur")
    if not lg.handlers:
        setup_logging()

get_logger(name: str) -> logging.Logger

Get a logger for the given module name.

Source code in src/orateur/log.py
39
40
41
42
43
def get_logger(name: str) -> logging.Logger:
    """Get a logger for the given module name."""
    if name.startswith("orateur"):
        return logging.getLogger(name)
    return logging.getLogger(f"orateur.{name}")

setup_logging(level: str | int | None = None, format_string: str = '%(levelname)s - %(name)s - %(message)s') -> None

Configure logging for the application.

Logs to stderr. Level can be set via ORATEUR_LOG_LEVEL env var (DEBUG, INFO, WARNING, ERROR).

Source code in src/orateur/log.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def setup_logging(
    level: str | int | None = None,
    format_string: str = "%(levelname)s - %(name)s - %(message)s",
) -> None:
    """Configure logging for the application.

    Logs to stderr. Level can be set via ORATEUR_LOG_LEVEL env var
    (DEBUG, INFO, WARNING, ERROR).
    """
    if level is None:
        level = os.environ.get("ORATEUR_LOG_LEVEL", "INFO")
    if isinstance(level, str):
        level = getattr(logging, level.upper(), logging.INFO)

    root = logging.getLogger("orateur")
    root.setLevel(level)
    root.handlers.clear()

    handler = logging.StreamHandler(sys.stderr)
    handler.setLevel(level)
    handler.setFormatter(logging.Formatter(format_string))
    root.addHandler(handler)

    # Prevent propagation to root logger (avoids duplicate logs)
    root.propagate = False

main

Main application loop for Orateur.

run(config: ConfigManager | None = None) -> None

Run the main loop (used by systemd).

Source code in src/orateur/main.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def run(config: ConfigManager | None = None) -> None:
    """Run the main loop (used by systemd)."""
    log_config.ensure_logging_configured()
    config = config or ConfigManager()

    ui_mirror.reset_ui_events_file()

    log.info("Loading STT...")
    stt = get_stt_backend(config.get_setting("stt_backend", "pywhispercpp"), config)
    if not stt or not stt.is_ready():
        log.error("STT failed to initialize")
        desktop_notify(
            "Cannot start — STT failed",
            "Speech-to-text did not initialize. Check logs and your STT backend.",
            urgency="critical",
        )
        sys.exit(1)

    log.info("Loading TTS...")
    tts = get_tts_backend(config.get_setting("tts_backend", "pocket_tts"), config)
    if not tts or not tts.is_ready():
        log.warning("TTS not ready - speak/sts will be limited")

    llm_name = config.get_setting("llm_backend", "ollama")
    if is_llm_disabled(llm_name):
        log.info("LLM disabled (llm_backend=%s)", llm_name)
        llm = None
    else:
        log.info("Loading LLM...")
        llm = get_llm_backend(llm_name, config)
        if not llm or not llm.is_ready():
            log.warning("LLM not ready - sts will be limited")

    audio = AudioCapture(config=config)
    injector = TextInjector(config)

    recording_for = [None]  # "stt" | "stt_secondary" | "sts" | None
    tts_active = [False]
    tts_lock = threading.Lock()

    def m(event: str, **payload) -> None:
        ui_mirror.send(config, event, **payload)

    def on_primary():
        if recording_for[0] == "stt":
            recording_for[0] = None
            data = audio.stop_recording()
            if data is None:
                m("error", message="No audio recorded")
                return
            m("recording_stopped", levels=audio_to_levels(data, 60))
            m("transcribing")
            try:
                text = stt.transcribe(data)
            except Exception as e:
                log.exception("Transcription failed")
                m("error", message=str(e))
                return
            m("transcribed", text=text or "")
            if text:
                injector.inject_text(text)
        else:
            recording_for[0] = "stt"

            def on_level(rms: float) -> None:
                m("recording", level=rms)

            if audio.start_recording(level_callback=on_level):
                m("recording_started", mode="stt")
            else:
                m("error", message="Failed to start recording")

    def on_secondary():
        if recording_for[0] == "stt_secondary":
            recording_for[0] = None
            data = audio.stop_recording()
            if data is None:
                m("error", message="No audio recorded")
                return
            m("recording_stopped", levels=audio_to_levels(data, 60))
            m("transcribing")
            lang = config.get_setting("stt_language_secondary")
            prompt = config.get_setting("stt_whisper_prompt_secondary")
            try:
                text = stt.transcribe(data, language_override=lang, prompt_override=prompt)
            except Exception as e:
                log.exception("Transcription failed")
                m("error", message=str(e))
                return
            m("transcribed", text=text or "")
            if text:
                injector.inject_text(text)
        else:
            recording_for[0] = "stt_secondary"

            def on_level_sec(rms: float) -> None:
                m("recording", level=rms)

            if audio.start_recording(level_callback=on_level_sec):
                m("recording_started", mode="stt")
            else:
                m("error", message="Failed to start recording")

    def on_sts():
        if recording_for[0] == "sts":
            recording_for[0] = None
            data = audio.stop_recording()
            if data is None:
                m("error", message="No audio recorded")
                return
            m("recording_stopped", levels=audio_to_levels(data, 60))

            def ui_m(ev: str, **kw) -> None:
                ui_mirror.send(config, ev, **kw)

            run_sts(config, data, stt=stt, tts=tts, llm=llm, ui_mirror=ui_m)
        else:
            recording_for[0] = "sts"

            def on_level_sts(rms: float) -> None:
                m("recording", level=rms)

            if audio.start_recording(level_callback=on_level_sts):
                m("recording_started", mode="sts")
            else:
                m("error", message="Failed to start recording")

    def on_tts():
        log.info("TTS shortcut fired")
        if not tts or not tts.is_ready():
            log.warning("TTS shortcut ignored: backend not ready")
            m("error", message="TTS not ready")
            return
        with tts_lock:
            if tts_active[0]:
                log.info("TTS: stop playback (shortcut pressed again)")
                tts.stop_playback()
                return
            text = _get_text_from_selection(config)
            if not text:
                log.info("TTS shortcut: no text in clipboard or selection")
                m("error", message="No text to read (copy text first)")
                return
            tts_active[0] = True

        nchars = len(text)
        preview = text[:80].replace("\n", " ")
        if len(text) > 80:
            preview += "…"
        log.info("TTS: speaking %d characters: %s", nchars, preview)

        duration_sec = tts.estimate_duration(text)
        m("tts_estimate", duration_sec=duration_sec)
        m("tts_playing")

        def on_lvl(level: float) -> None:
            m("tts_level", level=level)

        ok = False
        try:
            try:
                ok = tts.synthesize_and_play(text, level_callback=on_lvl)
            except TypeError:
                ok = tts.synthesize_and_play(text)
        except Exception:
            log.exception("TTS playback raised an exception")
            ok = False
        finally:
            with tts_lock:
                tts_active[0] = False
            m("tts_done", success=bool(ok))
        log.info("TTS: finished ok=%s", ok)

    shortcuts = ShortcutManager(config)
    shortcuts.register("primary", config.get_setting("primary_shortcut"), on_primary)
    shortcuts.register("secondary", config.get_setting("secondary_shortcut"), on_secondary)
    shortcuts.register("tts", config.get_setting("tts_shortcut"), on_tts)
    shortcuts.register("sts", config.get_setting("sts_shortcut"), on_sts)

    if not shortcuts.start():
        desktop_notify(
            "Cannot start — shortcuts failed",
            "Global hotkeys could not start. Check accessibility permissions, pynput, and shortcut settings.",
            urgency="critical",
        )
        sys.exit(1)

    log.info(
        "Orateur ready — shortcuts active: primary=%r secondary=%r tts=%r sts=%r",
        config.get_setting("primary_shortcut"),
        config.get_setting("secondary_shortcut"),
        config.get_setting("tts_shortcut"),
        config.get_setting("sts_shortcut"),
    )
    if config.get_setting("desktop_notifications", True):
        desktop_notify("Orateur started", "Speech shortcuts are active.", urgency="low")

    quickshell_proc = [None]
    if config.get_setting("quickshell_autostart", False):
        quickshell_proc[0] = quickshell_spawn.start_quickshell()

    shutdown_requested = [False]

    def shutdown(sig, frame):
        if shutdown_requested[0]:
            os._exit(0)
        shutdown_requested[0] = True

    signal.signal(signal.SIGINT, shutdown)
    signal.signal(signal.SIGTERM, shutdown)

    try:
        while not shutdown_requested[0]:
            time.sleep(0.2)
    except KeyboardInterrupt:
        pass
    finally:
        log.info("Shutting down...")
        if config.get_setting("desktop_notifications", True):
            desktop_notify("Orateur stopped", "Speech shortcuts are inactive.", urgency="low")
        quickshell_spawn.stop_quickshell(quickshell_proc[0])
        shortcuts.stop()
        # Bypass Python interpreter shutdown to avoid C++ destructor crashes
        # (pywhispercpp/ggml and PyTorch can crash when daemon threads are
        # abruptly terminated during normal exit)
        os._exit(0)

paths

Centralized path constants for Orateur with XDG Base Directory support.

quickshell_spawn

Optional Quickshell child process for orateur run.

start_quickshell() -> Optional[subprocess.Popen]

Spawn quickshell -c orateur detached from our stdin; returns None if not found.

Source code in src/orateur/quickshell_spawn.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def start_quickshell() -> Optional[subprocess.Popen]:
    """Spawn quickshell -c orateur detached from our stdin; returns None if not found."""
    argv = _quickshell_argv()
    if not argv:
        log.warning("quickshell not found in PATH; install Quickshell or extend PATH")
        return None
    try:
        proc = subprocess.Popen(
            argv,
            stdin=subprocess.DEVNULL,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            env=None,
            start_new_session=True,
        )
    except OSError as e:
        log.warning("Failed to start Quickshell: %s", e)
        return None
    time.sleep(0.15)
    if proc.poll() is not None:
        log.warning("Quickshell exited immediately (code %s)", proc.returncode)
        return None
    log.info("Started Quickshell (pid %s)", proc.pid)
    return proc

shortcuts

Global keyboard shortcuts: Linux uses evdev; macOS and Windows use pynput.

EvdevShortcutManager(config)

Listens for multiple shortcuts and invokes callbacks (Linux /dev/input).

Source code in src/orateur/shortcuts.py
149
150
151
152
153
154
155
156
157
158
def __init__(self, config):
    self.config = config
    self.shortcuts: dict[str, tuple[frozenset, Callable]] = {}
    self.devices: list[InputDevice] = []
    self.pressed_keys: set[int] = set()
    self.active: dict[str, bool] = {}
    self.last_trigger: dict[str, float] = {}
    self.debounce = 0.1
    self.stop_event = threading.Event()
    self.thread: Optional[threading.Thread] = None

PynputShortcutManager(config)

Listens for global hotkeys via pynput (macOS / Windows).

Source code in src/orateur/shortcuts.py
275
276
277
278
279
280
281
def __init__(self, config):
    self.config = config
    self._hotkey_map: dict[str, Callable[[], None]] = {}
    self.last_trigger: dict[str, float] = {}
    self.debounce = 0.1
    self._listener = None
    self._lock = threading.Lock()

UnsupportedShortcutManager(config)

Placeholder when the platform has no backend.

Source code in src/orateur/shortcuts.py
359
360
def __init__(self, config):
    self.config = config

sts_pipeline

Speech-to-Speech pipeline: STT -> LLM -> TTS.

run_sts(config, audio_data, sample_rate: int = 16000, language_override: Optional[str] = None, stt=None, tts=None, llm=None, ui_mirror: Optional[Callable[..., None]] = None) -> bool

Run Speech-to-Speech: transcribe -> LLM -> TTS.

If stt, tts, or llm are provided and ready, they are reused. Otherwise new backends are created (for CLI compatibility).

Returns True if audio was played successfully.

Source code in src/orateur/sts_pipeline.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def run_sts(
    config,
    audio_data,
    sample_rate: int = 16000,
    language_override: Optional[str] = None,
    stt=None,
    tts=None,
    llm=None,
    ui_mirror: Optional[Callable[..., None]] = None,
) -> bool:
    """
    Run Speech-to-Speech: transcribe -> LLM -> TTS.

    If stt, tts, or llm are provided and ready, they are reused.
    Otherwise new backends are created (for CLI compatibility).

    Returns True if audio was played successfully.
    """
    stt_name = config.get_setting("stt_backend", "pywhispercpp")
    tts_name = config.get_setting("tts_backend", "pocket_tts")
    llm_name = config.get_setting("llm_backend", "ollama")

    def _m(event: str, **kw) -> None:
        if ui_mirror:
            ui_mirror(event, **kw)

    if stt is None or not stt.is_ready():
        stt = get_stt_backend(stt_name, config)
    if tts is None or not tts.is_ready():
        tts = get_tts_backend(tts_name, config)
    if llm is None or not llm.is_ready():
        llm = get_llm_backend(llm_name, config)

    if not stt or not stt.is_ready():
        log.error("STT not ready")
        _m("error", message="STT not ready")
        return False
    if not tts or not tts.is_ready():
        log.error("TTS not ready")
        _m("error", message="TTS not ready")
        return False
    if not llm or not llm.is_ready():
        log.error("LLM not ready")
        _m("error", message="LLM not ready")
        return False

    _m("transcribing")
    text = stt.transcribe(audio_data, sample_rate, language_override)
    if not text or not text.strip():
        log.error("No transcription")
        _m("error", message="No transcription")
        return False

    _m("transcribed", text=text)
    system_prompt = config.get_setting("llm_system_prompt", "You are a helpful assistant. Respond concisely.")
    response = llm.generate(text, system_prompt=system_prompt)
    if not response or not response.strip():
        log.error("No LLM response")
        _m("error", message="No LLM response")
        return False

    duration_sec = tts.estimate_duration(response)
    _m("tts_estimate", duration_sec=duration_sec)
    _m("tts_playing")

    def on_tts_level(level: float) -> None:
        _m("tts_level", level=level)

    try:
        try:
            ok = tts.synthesize_and_play(
                response,
                level_callback=on_tts_level if ui_mirror else None,
            )
        except TypeError:
            ok = tts.synthesize_and_play(response)
    except Exception as e:
        log.exception("TTS playback failed")
        _m("error", message=str(e))
        return False
    _m("tts_done", success=ok)
    return bool(ok)

stt

STT (Speech-to-Text) backends.

STTBackend(config: object)

Bases: ABC

Abstract base class for Speech-to-Text backends.

Subclasses store config as needed.

Source code in src/orateur/stt/base.py
12
13
def __init__(self, config: object) -> None:
    """Subclasses store ``config`` as needed."""
get_available_models() -> list[str]

Return list of available model names for this backend.

Source code in src/orateur/stt/base.py
46
47
48
def get_available_models(self) -> list[str]:
    """Return list of available model names for this backend."""
    return []
initialize(config) -> bool abstractmethod

Initialize the backend. Returns True on success.

Source code in src/orateur/stt/base.py
15
16
17
18
@abstractmethod
def initialize(self, config) -> bool:
    """Initialize the backend. Returns True on success."""
    pass
is_ready() -> bool

Check if backend is ready for transcription.

Source code in src/orateur/stt/base.py
42
43
44
def is_ready(self) -> bool:
    """Check if backend is ready for transcription."""
    return True
transcribe(audio_data: np.ndarray, sample_rate: int = 16000, language_override: Optional[str] = None, prompt_override: Optional[str] = None) -> str abstractmethod

Transcribe audio to text.

Args: audio_data: NumPy array of float32 audio samples (mono) sample_rate: Sample rate (typically 16000) language_override: Optional language code (e.g. 'en', 'fr') prompt_override: Optional Whisper initial prompt override

Returns: Transcribed text string

Source code in src/orateur/stt/base.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@abstractmethod
def transcribe(
    self,
    audio_data: np.ndarray,
    sample_rate: int = 16000,
    language_override: Optional[str] = None,
    prompt_override: Optional[str] = None,
) -> str:
    """
    Transcribe audio to text.

    Args:
        audio_data: NumPy array of float32 audio samples (mono)
        sample_rate: Sample rate (typically 16000)
        language_override: Optional language code (e.g. 'en', 'fr')
        prompt_override: Optional Whisper initial prompt override

    Returns:
        Transcribed text string
    """
    pass

get_stt_backend(name: str, config) -> Optional[STTBackend]

Get and initialize an STT backend by name.

Source code in src/orateur/stt/registry.py
13
14
15
16
17
18
19
20
21
def get_stt_backend(name: str, config) -> Optional[STTBackend]:
    """Get and initialize an STT backend by name."""
    cls = _BACKENDS.get(name)
    if cls is None:
        return None
    backend = cls(config)
    if backend.initialize(config):
        return backend
    return None

list_stt_backends() -> list[str]

List registered STT backend names.

Source code in src/orateur/stt/registry.py
24
25
26
def list_stt_backends() -> list[str]:
    """List registered STT backend names."""
    return list(_BACKENDS.keys())

base

Abstract STT backend interface.

STTBackend(config: object)

Bases: ABC

Abstract base class for Speech-to-Text backends.

Subclasses store config as needed.

Source code in src/orateur/stt/base.py
12
13
def __init__(self, config: object) -> None:
    """Subclasses store ``config`` as needed."""
get_available_models() -> list[str]

Return list of available model names for this backend.

Source code in src/orateur/stt/base.py
46
47
48
def get_available_models(self) -> list[str]:
    """Return list of available model names for this backend."""
    return []
initialize(config) -> bool abstractmethod

Initialize the backend. Returns True on success.

Source code in src/orateur/stt/base.py
15
16
17
18
@abstractmethod
def initialize(self, config) -> bool:
    """Initialize the backend. Returns True on success."""
    pass
is_ready() -> bool

Check if backend is ready for transcription.

Source code in src/orateur/stt/base.py
42
43
44
def is_ready(self) -> bool:
    """Check if backend is ready for transcription."""
    return True
transcribe(audio_data: np.ndarray, sample_rate: int = 16000, language_override: Optional[str] = None, prompt_override: Optional[str] = None) -> str abstractmethod

Transcribe audio to text.

Args: audio_data: NumPy array of float32 audio samples (mono) sample_rate: Sample rate (typically 16000) language_override: Optional language code (e.g. 'en', 'fr') prompt_override: Optional Whisper initial prompt override

Returns: Transcribed text string

Source code in src/orateur/stt/base.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@abstractmethod
def transcribe(
    self,
    audio_data: np.ndarray,
    sample_rate: int = 16000,
    language_override: Optional[str] = None,
    prompt_override: Optional[str] = None,
) -> str:
    """
    Transcribe audio to text.

    Args:
        audio_data: NumPy array of float32 audio samples (mono)
        sample_rate: Sample rate (typically 16000)
        language_override: Optional language code (e.g. 'en', 'fr')
        prompt_override: Optional Whisper initial prompt override

    Returns:
        Transcribed text string
    """
    pass

pywhispercpp

pywhispercpp STT backend.

PyWhisperCppBackend(config)

Bases: STTBackend

Whisper via pywhispercpp (local CPU, or GPU via CUDA / Metal / Vulkan per build).

Source code in src/orateur/stt/pywhispercpp.py
29
30
31
32
33
34
def __init__(self, config):
    super().__init__(config)
    self.config = config
    self._model = None
    self._current_model = None
    self.ready = False
whisper_models_dir() -> Path

Where pywhispercpp stores ggml weights (uses platformdirs; not ~/.local on macOS).

Source code in src/orateur/stt/pywhispercpp.py
16
17
18
19
20
21
22
23
def whisper_models_dir() -> Path:
    """Where pywhispercpp stores ggml weights (uses platformdirs; not ~/.local on macOS)."""
    try:
        from pywhispercpp.constants import MODELS_DIR

        return Path(MODELS_DIR)
    except ImportError:
        return Path.home() / ".local" / "share" / "pywhispercpp" / "models"

registry

STT backend registry - plug-n-play discovery.

get_stt_backend(name: str, config) -> Optional[STTBackend]

Get and initialize an STT backend by name.

Source code in src/orateur/stt/registry.py
13
14
15
16
17
18
19
20
21
def get_stt_backend(name: str, config) -> Optional[STTBackend]:
    """Get and initialize an STT backend by name."""
    cls = _BACKENDS.get(name)
    if cls is None:
        return None
    backend = cls(config)
    if backend.initialize(config):
        return backend
    return None
list_stt_backends() -> list[str]

List registered STT backend names.

Source code in src/orateur/stt/registry.py
24
25
26
def list_stt_backends() -> list[str]:
    """List registered STT backend names."""
    return list(_BACKENDS.keys())
register_stt_backend(name: str, backend_cls: Type[STTBackend]) -> None

Register a new STT backend.

Source code in src/orateur/stt/registry.py
29
30
31
def register_stt_backend(name: str, backend_cls: Type[STTBackend]) -> None:
    """Register a new STT backend."""
    _BACKENDS[name] = backend_cls

text_injector

Text injection via clipboard + synthetic paste (ydotool on Linux, pynput on macOS/Windows).

TextInjector(config=None)

Inject text into focused app via clipboard + paste hotkey.

Source code in src/orateur/text_injector.py
30
31
32
def __init__(self, config=None):
    self.config = config
    self.ydotool_available = shutil.which("ydotool") is not None

tts

TTS (Text-to-Speech) backends.

TTSBackend(config: object)

Bases: ABC

Abstract base class for Text-to-Speech backends.

Subclasses store config as needed.

Source code in src/orateur/tts/base.py
11
12
def __init__(self, config: object) -> None:
    """Subclasses store ``config`` as needed."""
estimate_duration(text: str) -> float

Estimate TTS duration in seconds from text (heuristic). ~150 words/min ≈ 2.5 words/sec; fallback ~4 chars/sec.

Source code in src/orateur/tts/base.py
73
74
75
76
77
78
79
80
81
82
83
def estimate_duration(self, text: str) -> float:
    """
    Estimate TTS duration in seconds from text (heuristic).
    ~150 words/min ≈ 2.5 words/sec; fallback ~4 chars/sec.
    """
    if not text or not text.strip():
        return 0.0
    words = len(text.split())
    if words > 0:
        return words / 2.5
    return len(text) / 15.0
get_available_voices() -> list[str]

Return list of available voice names.

Source code in src/orateur/tts/base.py
89
90
91
def get_available_voices(self) -> list[str]:
    """Return list of available voice names."""
    return []
initialize(config) -> bool abstractmethod

Initialize the backend. Returns True on success.

Source code in src/orateur/tts/base.py
14
15
16
17
@abstractmethod
def initialize(self, config) -> bool:
    """Initialize the backend. Returns True on success."""
    pass
is_ready() -> bool

Check if backend is ready.

Source code in src/orateur/tts/base.py
85
86
87
def is_ready(self) -> bool:
    """Check if backend is ready."""
    return True
stop_playback() -> None

Stop in-flight playback if supported. Override in backends that play audio.

Source code in src/orateur/tts/base.py
93
94
95
def stop_playback(self) -> None:
    """Stop in-flight playback if supported. Override in backends that play audio."""
    pass
synthesize(text: str, voice: Optional[str] = None) -> Optional[Path] abstractmethod

Synthesize text to audio file.

Args: text: Text to speak voice: Optional voice name (uses default if None)

Returns: Path to WAV file, or None on failure

Source code in src/orateur/tts/base.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@abstractmethod
def synthesize(
    self,
    text: str,
    voice: Optional[str] = None,
) -> Optional[Path]:
    """
    Synthesize text to audio file.

    Args:
        text: Text to speak
        voice: Optional voice name (uses default if None)

    Returns:
        Path to WAV file, or None on failure
    """
    pass
synthesize_and_play(text: str, voice: Optional[str] = None, volume: Optional[float] = None, level_callback: Optional[Callable[[float], None]] = None) -> bool

Synthesize and play audio. Default implementation: synthesize then play file.

Source code in src/orateur/tts/base.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def synthesize_and_play(
    self,
    text: str,
    voice: Optional[str] = None,
    volume: Optional[float] = None,
    level_callback: Optional[Callable[[float], None]] = None,
) -> bool:
    """
    Synthesize and play audio. Default implementation: synthesize then play file.
    """
    wav = self.synthesize(text, voice)
    if wav:
        return self._play_file(wav, volume)
    return False

get_tts_backend(name: str, config) -> Optional[TTSBackend]

Get and initialize a TTS backend by name.

Source code in src/orateur/tts/registry.py
13
14
15
16
17
18
19
20
21
def get_tts_backend(name: str, config) -> Optional[TTSBackend]:
    """Get and initialize a TTS backend by name."""
    cls = _BACKENDS.get(name)
    if cls is None:
        return None
    backend = cls(config)
    if backend.initialize(config):
        return backend
    return None

list_tts_backends() -> list[str]

List registered TTS backend names.

Source code in src/orateur/tts/registry.py
24
25
26
def list_tts_backends() -> list[str]:
    """List registered TTS backend names."""
    return list(_BACKENDS.keys())

base

Abstract TTS backend interface.

TTSBackend(config: object)

Bases: ABC

Abstract base class for Text-to-Speech backends.

Subclasses store config as needed.

Source code in src/orateur/tts/base.py
11
12
def __init__(self, config: object) -> None:
    """Subclasses store ``config`` as needed."""
estimate_duration(text: str) -> float

Estimate TTS duration in seconds from text (heuristic). ~150 words/min ≈ 2.5 words/sec; fallback ~4 chars/sec.

Source code in src/orateur/tts/base.py
73
74
75
76
77
78
79
80
81
82
83
def estimate_duration(self, text: str) -> float:
    """
    Estimate TTS duration in seconds from text (heuristic).
    ~150 words/min ≈ 2.5 words/sec; fallback ~4 chars/sec.
    """
    if not text or not text.strip():
        return 0.0
    words = len(text.split())
    if words > 0:
        return words / 2.5
    return len(text) / 15.0
get_available_voices() -> list[str]

Return list of available voice names.

Source code in src/orateur/tts/base.py
89
90
91
def get_available_voices(self) -> list[str]:
    """Return list of available voice names."""
    return []
initialize(config) -> bool abstractmethod

Initialize the backend. Returns True on success.

Source code in src/orateur/tts/base.py
14
15
16
17
@abstractmethod
def initialize(self, config) -> bool:
    """Initialize the backend. Returns True on success."""
    pass
is_ready() -> bool

Check if backend is ready.

Source code in src/orateur/tts/base.py
85
86
87
def is_ready(self) -> bool:
    """Check if backend is ready."""
    return True
stop_playback() -> None

Stop in-flight playback if supported. Override in backends that play audio.

Source code in src/orateur/tts/base.py
93
94
95
def stop_playback(self) -> None:
    """Stop in-flight playback if supported. Override in backends that play audio."""
    pass
synthesize(text: str, voice: Optional[str] = None) -> Optional[Path] abstractmethod

Synthesize text to audio file.

Args: text: Text to speak voice: Optional voice name (uses default if None)

Returns: Path to WAV file, or None on failure

Source code in src/orateur/tts/base.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@abstractmethod
def synthesize(
    self,
    text: str,
    voice: Optional[str] = None,
) -> Optional[Path]:
    """
    Synthesize text to audio file.

    Args:
        text: Text to speak
        voice: Optional voice name (uses default if None)

    Returns:
        Path to WAV file, or None on failure
    """
    pass
synthesize_and_play(text: str, voice: Optional[str] = None, volume: Optional[float] = None, level_callback: Optional[Callable[[float], None]] = None) -> bool

Synthesize and play audio. Default implementation: synthesize then play file.

Source code in src/orateur/tts/base.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def synthesize_and_play(
    self,
    text: str,
    voice: Optional[str] = None,
    volume: Optional[float] = None,
    level_callback: Optional[Callable[[float], None]] = None,
) -> bool:
    """
    Synthesize and play audio. Default implementation: synthesize then play file.
    """
    wav = self.synthesize(text, voice)
    if wav:
        return self._play_file(wav, volume)
    return False

pocket_tts

Pocket TTS backend.

PocketTTSBackend(config)

Bases: TTSBackend

Pocket TTS text-to-speech.

Source code in src/orateur/tts/pocket_tts.py
33
34
35
36
37
38
39
40
41
42
43
def __init__(self, config):
    super().__init__(config)
    self.config = config
    self._model = None
    self._voice_state_cache = {}
    self.ready = False
    self.voice = config.get_setting("tts_voice", "alba")
    self.volume = max(0.1, min(1.0, float(config.get_setting("tts_volume", 1.0))))
    self._playback_lock = threading.Lock()
    self._playback_proc: Optional[subprocess.Popen] = None
    self._stop_event = threading.Event()

registry

TTS backend registry.

get_tts_backend(name: str, config) -> Optional[TTSBackend]

Get and initialize a TTS backend by name.

Source code in src/orateur/tts/registry.py
13
14
15
16
17
18
19
20
21
def get_tts_backend(name: str, config) -> Optional[TTSBackend]:
    """Get and initialize a TTS backend by name."""
    cls = _BACKENDS.get(name)
    if cls is None:
        return None
    backend = cls(config)
    if backend.initialize(config):
        return backend
    return None
list_tts_backends() -> list[str]

List registered TTS backend names.

Source code in src/orateur/tts/registry.py
24
25
26
def list_tts_backends() -> list[str]:
    """List registered TTS backend names."""
    return list(_BACKENDS.keys())
register_tts_backend(name: str, backend_cls: Type[TTSBackend]) -> None

Register a new TTS backend.

Source code in src/orateur/tts/registry.py
29
30
31
def register_tts_backend(name: str, backend_cls: Type[TTSBackend]) -> None:
    """Register a new TTS backend."""
    _BACKENDS[name] = backend_cls

ui_daemon

Orateur UI daemon: JSON-RPC over FIFO (commands) and stdout (events).

Reads commands from ~/.cache/orateur/cmd.fifo, writes events to stdout. Used by the Quickshell OrateurWidget.

Use orateur ui --events-only with Quickshell when orateur run handles STT/TTS (one model load). Full orateur ui loads STT/TTS/LLM for FIFO-driven recording.

ui_mirror

Mirror UI events to ~/.cache/orateur/ui_events.jsonl when ui_events_mirror is enabled.

Any client can follow this file (e.g. Quickshell tail -F, the Tauri desktop app). Lines are NDJSON with the same shape as orateur ui stdout: {"event": "...", ...}.

reset_ui_events_file() -> None

Clear the JSONL file (unlink + recreate) so tail -F clients see a fresh file.

Source code in src/orateur/ui_mirror.py
38
39
40
41
42
43
44
45
46
def reset_ui_events_file() -> None:
    """Clear the JSONL file (unlink + recreate) so tail -F clients see a fresh file."""
    try:
        CACHE_DIR.mkdir(parents=True, exist_ok=True)
        if UI_EVENTS_JSONL.exists():
            UI_EVENTS_JSONL.unlink()
        UI_EVENTS_JSONL.write_text("", encoding="utf-8")
    except OSError as e:
        log.warning("Could not reset %s: %s", UI_EVENTS_JSONL, e)

send(config: ConfigManager, event: str, **payload: Any) -> None

Append one UI event line (non-blocking aside from a short file lock).

Source code in src/orateur/ui_mirror.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def send(config: ConfigManager, event: str, **payload: Any) -> None:
    """Append one UI event line (non-blocking aside from a short file lock)."""
    if not _mirror_enabled(config):
        return
    msg: dict[str, Any] = {"event": event}
    msg.update(payload)
    try:
        line = json.dumps(msg, separators=(",", ":"), default=_json_default)
    except TypeError as e:
        log.warning("ui_mirror JSON skip (%s): %s", event, e)
        return
    with _lock:
        try:
            CACHE_DIR.mkdir(parents=True, exist_ok=True)
            with open(UI_EVENTS_JSONL, "a", encoding="utf-8") as f:
                f.write(line + "\n")
                f.flush()
        except OSError as e:
            log.debug("ui_mirror append: %s", e)