From 70ec810f58fad418431c79958d33a3d77c4172e5 Mon Sep 17 00:00:00 2001 From: Peter Wood Date: Thu, 12 Mar 2026 11:18:25 -0400 Subject: [PATCH] feat: Add initial implementation of Plex Management TUI with backend support and styling --- plex/.gitignore | 3 + plex/tui/__init__.py | 0 plex/tui/app.py | 685 +++++++++++++++++++++++++++++++++++++++++ plex/tui/backend.py | 345 +++++++++++++++++++++ plex/tui/plex_tui.tcss | 279 +++++++++++++++++ 5 files changed, 1312 insertions(+) create mode 100644 plex/.gitignore create mode 100644 plex/tui/__init__.py create mode 100644 plex/tui/app.py create mode 100644 plex/tui/backend.py create mode 100644 plex/tui/plex_tui.tcss diff --git a/plex/.gitignore b/plex/.gitignore new file mode 100644 index 0000000..77ac754 --- /dev/null +++ b/plex/.gitignore @@ -0,0 +1,3 @@ +.venv/ +__pycache__/ +*.pyc diff --git a/plex/tui/__init__.py b/plex/tui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/plex/tui/app.py b/plex/tui/app.py new file mode 100644 index 0000000..7912c2a --- /dev/null +++ b/plex/tui/app.py @@ -0,0 +1,685 @@ +#!/usr/bin/env python3 +"""Plex Management TUI — a Textual-based terminal interface for Plex operations.""" + +from __future__ import annotations + +import asyncio +import sys +from pathlib import Path +from typing import Callable, Coroutine + +from textual import on, work +from textual.app import App, ComposeResult +from textual.containers import Horizontal, Vertical, VerticalScroll +from textual.screen import ModalScreen +from textual.widgets import ( + Button, + Footer, + Header, + Input, + Label, + ListItem, + ListView, + Log, + Static, +) + +from backend import CommandResult, run_command # noqa: local import +from backend import cache_sudo, is_sudo_failure # noqa: local import + +# ── Path to stylesheets ───────────────────────────────────────────────── +CSS_PATH = Path(__file__).parent / "plex_tui.tcss" + + +class NavListView(ListView): + """ListView that only changes selection on click/keyboard, not mouse hover.""" + + def on_mouse_move(self, event) -> None: + """Swallow mouse-move so the cursor doesn't follow the pointer.""" + event.stop() + +# ── Navigation items ──────────────────────────────────────────────────── + +NAV_SECTIONS: list[tuple[str, str]] = [ + ("service", "⏻ Service Control"), + ("libraries", "📚 Library Scanner"), + ("backup", "💾 Backup & Validate"), + ("monitor", "📊 Monitoring"), + ("database", "🗄️ Database Management"), + ("recovery", "🔧 Recovery"), + ("queries", "🔍 Queries & Stats"), + ("testing", "🧪 Testing"), +] + + +# ── Confirmation dialog ───────────────────────────────────────────────── + + +class ConfirmDialog(ModalScreen[bool]): + """A modal yes/no confirmation dialog.""" + + def __init__(self, title: str, body: str) -> None: + super().__init__() + self._title = title + self._body = body + + def compose(self) -> ComposeResult: + with Vertical(id="dialog-container"): + yield Label(self._title, id="dialog-title") + yield Static(self._body, id="dialog-body") + with Horizontal(classes="dialog-buttons"): + yield Button("Cancel", variant="default", id="dialog-cancel") + yield Button("Confirm", variant="error", id="dialog-confirm") + + @on(Button.Pressed, "#dialog-confirm") + def _confirm(self) -> None: + self.dismiss(True) + + @on(Button.Pressed, "#dialog-cancel") + def _cancel(self) -> None: + self.dismiss(False) + + +# ── Input dialog ───────────────────────────────────────────────────────── + + +class InputDialog(ModalScreen[str | None]): + """Modal dialog that asks for a text value.""" + + def __init__(self, title: str, body: str, placeholder: str = "") -> None: + super().__init__() + self._title = title + self._body = body + self._placeholder = placeholder + + def compose(self) -> ComposeResult: + with Vertical(id="input-dialog-container"): + yield Label(self._title, id="input-dialog-title") + yield Static(self._body, id="input-dialog-body") + yield Input(placeholder=self._placeholder, id="input-value") + with Horizontal(classes="dialog-buttons"): + yield Button("Cancel", variant="default", id="input-cancel") + yield Button("OK", variant="primary", id="input-ok") + + @on(Button.Pressed, "#input-ok") + def _ok(self) -> None: + value = self.query_one("#input-value", Input).value.strip() + self.dismiss(value if value else None) + + @on(Button.Pressed, "#input-cancel") + def _cancel(self) -> None: + self.dismiss(None) + + @on(Input.Submitted) + def _submit(self) -> None: + self._ok() + + +# ── Password dialog ────────────────────────────────────────────────────── + + +class PasswordDialog(ModalScreen[str | None]): + """Modal dialog that asks for a password (masked input).""" + + def __init__(self, title: str, body: str) -> None: + super().__init__() + self._title = title + self._body = body + + def compose(self) -> ComposeResult: + with Vertical(id="password-dialog-container"): + yield Label(self._title, id="password-dialog-title") + yield Static(self._body, id="password-dialog-body") + yield Input(placeholder="Password", password=True, id="password-value") + with Horizontal(classes="dialog-buttons"): + yield Button("Cancel", variant="default", id="password-cancel") + yield Button("Authenticate", variant="primary", id="password-ok") + + @on(Button.Pressed, "#password-ok") + def _ok(self) -> None: + value = self.query_one("#password-value", Input).value + self.dismiss(value if value else None) + + @on(Button.Pressed, "#password-cancel") + def _cancel(self) -> None: + self.dismiss(None) + + @on(Input.Submitted) + def _submit(self) -> None: + self._ok() + + +# ── Section panels ─────────────────────────────────────────────────────── + + +def _section_header(text: str) -> Static: + return Static(text, classes="section-header") + + +def _btn(label: str, btn_id: str, classes: str = "action-btn") -> Button: + return Button(label, id=btn_id, classes=classes) + + +class ServicePanel(Vertical): + def compose(self) -> ComposeResult: + yield _section_header("Plex Media Server — Service Control") + with Horizontal(classes="button-row"): + yield _btn("▶ Start", "svc-start", "action-btn-success") + yield _btn("⏹ Stop", "svc-stop", "action-btn-danger") + yield _btn("🔄 Restart", "svc-restart", "action-btn-warning") + with Horizontal(classes="button-row"): + yield _btn("ℹ️ Status", "svc-status") + + +class LibraryPanel(Vertical): + def compose(self) -> ComposeResult: + yield _section_header("Library Scanner") + with Horizontal(classes="button-row"): + yield _btn("📋 List Libraries", "lib-list") + yield _btn("🔍 Scan All", "lib-scan-all") + yield _btn("🔍 Scan Section…", "lib-scan-id") + with Horizontal(classes="button-row"): + yield _btn("🔄 Refresh All", "lib-refresh-all") + yield _btn("🔄 Refresh Section…", "lib-refresh-id") + yield _btn("⚡ Force Refresh All", "lib-force-refresh") + with Horizontal(classes="button-row"): + yield _btn("📊 Analyze All", "lib-analyze-all") + yield _btn("📊 Analyze Section…", "lib-analyze-id") + + +class BackupPanel(Vertical): + def compose(self) -> ComposeResult: + yield _section_header("Backup & Validation") + with Horizontal(classes="button-row"): + yield _btn("💾 Run Backup", "bak-run") + yield _btn("💾 Backup + Auto-Repair", "bak-run-repair") + yield _btn("🔍 Integrity Check", "bak-integrity") + with Horizontal(classes="button-row"): + yield _btn("📋 List Backups", "bak-list") + yield _btn("✅ Validate Latest", "bak-validate-latest") + yield _btn("📝 Validation Report", "bak-validate-report") + with Horizontal(classes="button-row"): + yield _btn("📦 Built-in Status", "bak-builtin") + yield _btn("📦 Built-in Detailed", "bak-builtin-detail") + + +class MonitorPanel(Vertical): + def compose(self) -> ComposeResult: + yield _section_header("Backup Monitoring Dashboard") + with Horizontal(classes="button-row"): + yield _btn("📊 Show Dashboard", "mon-dashboard") + + +class DatabasePanel(Vertical): + def compose(self) -> ComposeResult: + yield _section_header("Database Management") + with Horizontal(classes="button-row"): + yield _btn("🔍 Integrity Check", "db-check") + yield _btn("🔧 Gentle Repair", "db-repair-gentle", "action-btn-warning") + yield _btn("⚠️ Force Repair", "db-repair-force", "action-btn-danger") + with Horizontal(classes="button-row"): + yield _btn("🧹 Cleanup (dry-run)", "db-cleanup-dry") + yield _btn("🧹 Cleanup (apply)", "db-cleanup-apply", "action-btn-warning") + yield _btn("📥 Install/Update DBRepair", "db-install-dbrepair") + + +class RecoveryPanel(Vertical): + def compose(self) -> ComposeResult: + yield _section_header("Recovery Operations") + with Horizontal(classes="button-row"): + yield _btn("🔎 Verify Backup Only", "rec-verify") + yield _btn("🧪 Nuclear Dry-Run", "rec-nuclear-dry") + with Horizontal(classes="button-row"): + yield _btn("☢️ Nuclear Recovery", "rec-nuclear-auto", "action-btn-danger") + yield Static( + " ⚠ Nuclear recovery is a last resort — it replaces your entire database from backup.", + classes="status-warning", + ) + yield _section_header("Post-Recovery Validation") + with Horizontal(classes="button-row"): + yield _btn("⚡ Quick Validate", "rec-validate-quick") + yield _btn("🔍 Detailed Validate", "rec-validate-detailed") + yield _btn("📈 Performance Validate", "rec-validate-perf") + + +class QueryPanel(Vertical): + def compose(self) -> ComposeResult: + yield _section_header("Queries & Statistics") + with Horizontal(classes="button-row"): + yield _btn("🆕 Recent Additions (7d)", "qry-recent-7") + yield _btn("🆕 Recent Additions (30d)", "qry-recent-30") + yield _btn("🆕 Recent Additions…", "qry-recent-custom") + with Horizontal(classes="button-row"): + yield _btn("📊 Library Stats", "qry-stats") + yield _btn("🔢 Media Counts", "qry-counts") + yield _btn("📋 List Libraries", "qry-libraries") + with Horizontal(classes="button-row"): + yield _btn("💬 Custom SQL Query…", "qry-custom") + + +class TestingPanel(Vertical): + def compose(self) -> ComposeResult: + yield _section_header("Testing & Diagnostics") + with Horizontal(classes="button-row"): + yield _btn("⚡ Quick Smoke Tests", "test-quick") + yield _btn("🧪 Unit Tests", "test-unit") + yield _btn("🔗 Integration Tests", "test-integration") + with Horizontal(classes="button-row"): + yield _btn("🧹 Cleanup Test Artifacts", "test-cleanup") + + +# Map section key -> panel class +PANELS: dict[str, type] = { + "service": ServicePanel, + "libraries": LibraryPanel, + "backup": BackupPanel, + "monitor": MonitorPanel, + "database": DatabasePanel, + "recovery": RecoveryPanel, + "queries": QueryPanel, + "testing": TestingPanel, +} + + +# ── Main application ──────────────────────────────────────────────────── + + +class PlexTUI(App): + """Plex Management TUI.""" + + TITLE = "Plex Management Console" + CSS_PATH = CSS_PATH + BINDINGS = [ + ("q", "quit", "Quit"), + ("d", "toggle_dark", "Toggle Dark"), + ("c", "clear_log", "Clear Log"), + ("s", "authenticate_sudo", "Sudo Auth"), + ("1", "nav('service')", "Service"), + ("2", "nav('libraries')", "Libraries"), + ("3", "nav('backup')", "Backup"), + ("4", "nav('monitor')", "Monitor"), + ("5", "nav('database')", "Database"), + ("6", "nav('recovery')", "Recovery"), + ("7", "nav('queries')", "Queries"), + ("8", "nav('testing')", "Testing"), + ] + + def __init__(self) -> None: + super().__init__() + self._current_section = "service" + + # ── Composition ─────────────────────────────────────────────────── + + def compose(self) -> ComposeResult: + yield Header() + with Horizontal(): + with Vertical(id="sidebar"): + yield Static("Plex Manager", id="sidebar-title") + yield NavListView( + *[ + ListItem(Label(label, classes="nav-label"), id=f"nav-{key}") + for key, label in NAV_SECTIONS + ], + id="nav-list", + ) + with Vertical(id="main-content"): + # A scrollable area where the active section panel lives + yield VerticalScroll(ServicePanel(), id="panel-area") + yield Log(id="output-log", highlight=True, auto_scroll=True) + yield Footer() + + def on_mount(self) -> None: + self._select_nav(0) + # Disable any-event mouse tracking (mode 1003) to prevent hover effects. + # Basic click tracking (mode 1000) remains active. + self.set_timer(0.1, self._disable_mouse_move_tracking) + + def _disable_mouse_move_tracking(self) -> None: + sys.stdout.write("\x1b[?1003l") + sys.stdout.flush() + + # ── Navigation ──────────────────────────────────────────────────── + + def _select_nav(self, index: int) -> None: + nav_list = self.query_one("#nav-list", ListView) + nav_list.index = index + + @on(ListView.Selected, "#nav-list") + def _on_nav_selected(self, event: ListView.Selected) -> None: + item_id = event.item.id or "" + section_key = item_id.removeprefix("nav-") + if section_key in PANELS: + self._switch_section(section_key) + + def _switch_section(self, key: str) -> None: + if key == self._current_section: + return + self._current_section = key + panel_area = self.query_one("#panel-area", VerticalScroll) + panel_area.remove_children() + panel_area.mount(PANELS[key]()) + + def action_nav(self, section: str) -> None: + keys = [k for k, _ in NAV_SECTIONS] + if section in keys: + self._select_nav(keys.index(section)) + self._switch_section(section) + + def action_clear_log(self) -> None: + self.query_one("#output-log", Log).clear() + + # ── Sudo authentication ───────────────────────────────────── + + def action_authenticate_sudo(self) -> None: + self._prompt_sudo() + + def _prompt_sudo(self) -> None: + async def _cb(password: str | None) -> None: + if password is not None: + success = await cache_sudo(password) + if success: + self._log("🔓 Sudo credentials cached successfully.") + else: + self._log("✗ Sudo authentication failed. Wrong password?") + + self.push_screen( + PasswordDialog( + "🔒 Sudo Authentication", + "Enter your password to cache sudo credentials:", + ), + _cb, # type: ignore[arg-type] + ) + + # ── Logging helper ──────────────────────────────────────────────── + + def _log(self, text: str) -> None: + log_widget = self.query_one("#output-log", Log) + log_widget.write_line(text) + + def _log_result(self, label: str, result: CommandResult) -> None: + status = "✓" if result.ok else "✗" + self._log(f"[{status}] {label}") + if result.output: + for line in result.output.splitlines(): + self._log(f" {line}") + self._log("") + + # ── Async operation runner ──────────────────────────────────────── + + @work(thread=False) + async def _run_op( + self, + label: str, + coro: Coroutine[None, None, CommandResult], + ) -> None: + self._log(f"⏳ {label}...") + result = await coro + if is_sudo_failure(result): + self._log(f"[✗] {label}") + self._log(" 🔒 This command requires sudo. Press 's' to authenticate, then try again.") + self._log("") + else: + self._log_result(label, result) + + def _ask_section_then_run( + self, + title: str, + op: Callable[[str], Coroutine[None, None, CommandResult]], + ) -> None: + """Prompt for a section ID, then run an async operation with it.""" + + async def _do(dialog_result: str | None) -> None: + if dialog_result is not None: + self._run_op(f"{title} (section {dialog_result})", op(dialog_result)) + + self.app.push_screen( + InputDialog(title, "Enter library section ID:", placeholder="e.g. 1"), + _do, # type: ignore[arg-type] + ) + + # ── Button dispatch ─────────────────────────────────────────────── + + @on(Button.Pressed) + def _on_button(self, event: Button.Pressed) -> None: + from backend import ( # local import to keep top-level light + backup_builtin_detailed, + backup_builtin_status, + backup_integrity_check, + backup_list, + backup_run, + backup_validate, + backup_validate_report, + custom_query, + db_check, + db_cleanup, + db_install_dbrepair, + db_repair_force, + db_repair_gentle, + library_analyze, + library_list, + library_refresh, + library_scan, + library_stats, + list_libraries_query, + media_counts, + monitor_dashboard, + nuclear_recovery_auto, + nuclear_recovery_dry_run, + nuclear_recovery_verify, + plex_restart, + plex_start, + plex_status, + plex_stop, + recent_additions, + run_tests_cleanup, + run_tests_integration, + run_tests_quick, + run_tests_unit, + validate_recovery, + ) + + bid = event.button.id or "" + + # ── Service ──────────────────────────────── + if bid == "svc-start": + self._run_op("Start Plex", plex_start()) + elif bid == "svc-stop": + self._confirm_then_run( + "Stop Plex?", + "This will stop the Plex Media Server service.", + "Stop Plex", + plex_stop(), + ) + elif bid == "svc-restart": + self._confirm_then_run( + "Restart Plex?", + "This will restart the Plex Media Server service.", + "Restart Plex", + plex_restart(), + ) + elif bid == "svc-status": + self._run_op("Plex Status", plex_status()) + + # ── Libraries ────────────────────────────── + elif bid == "lib-list": + self._run_op("List Libraries", library_list()) + elif bid == "lib-scan-all": + self._run_op("Scan All Libraries", library_scan()) + elif bid == "lib-scan-id": + self._ask_section_then_run("Scan Library", library_scan) + elif bid == "lib-refresh-all": + self._run_op("Refresh All Libraries", library_refresh()) + elif bid == "lib-refresh-id": + self._ask_section_then_run("Refresh Library", library_refresh) + elif bid == "lib-force-refresh": + self._run_op("Force Refresh All", library_refresh(force=True)) + elif bid == "lib-analyze-all": + self._run_op("Analyze All Libraries", library_analyze()) + elif bid == "lib-analyze-id": + self._ask_section_then_run("Analyze Library", library_analyze) + + # ── Backup ───────────────────────────────── + elif bid == "bak-run": + self._run_op("Run Backup", backup_run()) + elif bid == "bak-run-repair": + self._run_op("Backup + Auto-Repair", backup_run(auto_repair=True)) + elif bid == "bak-integrity": + self._run_op("Integrity Check", backup_integrity_check()) + elif bid == "bak-list": + self._run_op("List Backups", backup_list()) + elif bid == "bak-validate-latest": + self._run_op("Validate Latest Backup", backup_validate(latest_only=True)) + elif bid == "bak-validate-report": + self._run_op("Full Validation Report", backup_validate_report()) + elif bid == "bak-builtin": + self._run_op("Built-in Backup Status", backup_builtin_status()) + elif bid == "bak-builtin-detail": + self._run_op("Built-in Backup (Detailed)", backup_builtin_detailed()) + + # ── Monitor ──────────────────────────────── + elif bid == "mon-dashboard": + self._run_op("Monitoring Dashboard", monitor_dashboard()) + + # ── Database ─────────────────────────────── + elif bid == "db-check": + self._run_op("Database Integrity Check", db_check()) + elif bid == "db-repair-gentle": + self._confirm_then_run( + "Gentle DB Repair?", + "This will attempt a gentle repair of the Plex database.", + "Gentle DB Repair", + db_repair_gentle(), + ) + elif bid == "db-repair-force": + self._confirm_then_run( + "Force DB Repair?", + "This will aggressively repair the Plex database. " + "The Plex service will be stopped during repair.", + "Force DB Repair", + db_repair_force(), + ) + elif bid == "db-cleanup-dry": + self._run_op("DB Cleanup (dry-run)", db_cleanup(dry_run=True)) + elif bid == "db-cleanup-apply": + self._confirm_then_run( + "Apply DB Cleanup?", + "This will permanently remove temporary and recovery files " + "from the Plex database directory.", + "DB Cleanup", + db_cleanup(dry_run=False), + ) + elif bid == "db-install-dbrepair": + self._run_op("Install/Update DBRepair", db_install_dbrepair()) + + # ── Recovery ─────────────────────────────── + elif bid == "rec-verify": + self._run_op("Verify Backup Integrity", nuclear_recovery_verify()) + elif bid == "rec-nuclear-dry": + self._run_op("Nuclear Recovery (dry-run)", nuclear_recovery_dry_run()) + elif bid == "rec-nuclear-auto": + self._confirm_then_run( + "☢️ NUCLEAR RECOVERY", + "This will REPLACE YOUR ENTIRE DATABASE from the best available backup.\n\n" + "This is a LAST RESORT operation. Plex will be stopped during recovery.\n" + "Are you absolutely sure?", + "Nuclear Recovery", + nuclear_recovery_auto(), + ) + elif bid == "rec-validate-quick": + self._run_op("Quick Recovery Validation", validate_recovery("--quick")) + elif bid == "rec-validate-detailed": + self._run_op( + "Detailed Recovery Validation", validate_recovery("--detailed") + ) + elif bid == "rec-validate-perf": + self._run_op( + "Performance Recovery Validation", validate_recovery("--performance") + ) + + # ── Queries ──────────────────────────────── + elif bid == "qry-recent-7": + self._run_op("Recent Additions (7 days)", recent_additions(7)) + elif bid == "qry-recent-30": + self._run_op("Recent Additions (30 days)", recent_additions(30)) + elif bid == "qry-recent-custom": + self._ask_days_then_run() + elif bid == "qry-stats": + self._run_op("Library Stats", library_stats()) + elif bid == "qry-counts": + self._run_op("Media Counts", media_counts()) + elif bid == "qry-libraries": + self._run_op("List Libraries", list_libraries_query()) + elif bid == "qry-custom": + self._ask_sql_then_run() + + # ── Testing ──────────────────────────────── + elif bid == "test-quick": + self._run_op("Quick Smoke Tests", run_tests_quick()) + elif bid == "test-unit": + self._run_op("Unit Tests", run_tests_unit()) + elif bid == "test-integration": + self._run_op("Integration Tests", run_tests_integration()) + elif bid == "test-cleanup": + self._run_op("Cleanup Test Artifacts", run_tests_cleanup()) + + # ── Confirmation helper ─────────────────────────────────────────── + + def _confirm_then_run( + self, + title: str, + body: str, + label: str, + coro: Coroutine[None, None, CommandResult], + ) -> None: + async def _callback(confirmed: bool) -> None: + if confirmed: + self._run_op(label, coro) + + self.push_screen(ConfirmDialog(title, body), _callback) # type: ignore[arg-type] + + # ── Input prompt helpers ────────────────────────────────────────── + + def _ask_days_then_run(self) -> None: + from backend import recent_additions + + async def _cb(val: str | None) -> None: + if val is not None and val.isdigit(): + self._run_op( + f"Recent Additions ({val} days)", recent_additions(int(val)) + ) + + self.push_screen( + InputDialog( + "Recent Additions", + "Enter number of days:", + placeholder="e.g. 14", + ), + _cb, # type: ignore[arg-type] + ) + + def _ask_sql_then_run(self) -> None: + from backend import custom_query + + async def _cb(val: str | None) -> None: + if val is not None: + self._run_op(f"Custom Query", custom_query(val)) + + self.push_screen( + InputDialog( + "Custom SQL Query", + "Enter a SQL query to run against the Plex database:", + placeholder="SELECT count(*) FROM metadata_items", + ), + _cb, # type: ignore[arg-type] + ) + + +# ── Entry point ────────────────────────────────────────────────────────── + +def main() -> None: + app = PlexTUI() + app.run() + + +if __name__ == "__main__": + main() diff --git a/plex/tui/backend.py b/plex/tui/backend.py new file mode 100644 index 0000000..fe45d2e --- /dev/null +++ b/plex/tui/backend.py @@ -0,0 +1,345 @@ +"""Plex Management TUI — backend helpers for running shell scripts.""" + +from __future__ import annotations + +import asyncio +import os +import shlex +from dataclasses import dataclass, field +from pathlib import Path + +SCRIPT_DIR = Path(__file__).resolve().parent.parent # /home/…/shell/plex + + +@dataclass +class CommandResult: + returncode: int + stdout: str + stderr: str + command: str + + @property + def ok(self) -> bool: + return self.returncode == 0 + + @property + def output(self) -> str: + text = self.stdout + if self.stderr: + text += "\n" + self.stderr + return text.strip() + + +def _script(name: str) -> str: + """Return absolute path to a script in the plex directory.""" + return str(SCRIPT_DIR / name) + + +# ── Ansi stripping ────────────────────────────────────────────────────── + +import re + +_ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]") + + +def strip_ansi(text: str) -> str: + return _ANSI_RE.sub("", text) + + +# ── Async command runner ──────────────────────────────────────────────── + +async def run_command( + cmd: str | list[str], + *, + sudo: bool = False, + timeout: int = 300, +) -> CommandResult: + """Run a shell command asynchronously and return the result.""" + if isinstance(cmd, list): + shell_cmd = " ".join(shlex.quote(c) for c in cmd) + else: + shell_cmd = cmd + + if sudo: + shell_cmd = f"sudo {shell_cmd}" + + env = os.environ.copy() + env["TERM"] = "dumb" # suppress colour in child scripts + + try: + proc = await asyncio.create_subprocess_shell( + shell_cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + stdin=asyncio.subprocess.DEVNULL, + env=env, + ) + stdout_bytes, stderr_bytes = await asyncio.wait_for( + proc.communicate(), timeout=timeout + ) + return CommandResult( + returncode=proc.returncode or 0, + stdout=strip_ansi(stdout_bytes.decode(errors="replace")), + stderr=strip_ansi(stderr_bytes.decode(errors="replace")), + command=shell_cmd, + ) + except asyncio.TimeoutError: + return CommandResult( + returncode=-1, + stdout="", + stderr=f"Command timed out after {timeout}s", + command=shell_cmd, + ) + except Exception as exc: + return CommandResult( + returncode=-1, + stdout="", + stderr=str(exc), + command=shell_cmd, + ) + + +# ── Sudo helpers ──────────────────────────────────────────────────────── + +_SUDO_FAIL_PATTERNS = [ + "sudo: a password is required", + "sudo: a terminal is required", + "sudo: no tty present", +] + + +def is_sudo_failure(result: CommandResult) -> bool: + """Return True if the command failed because of missing sudo credentials.""" + if result.ok: + return False + text = (result.stdout + " " + result.stderr).lower() + return any(p in text for p in _SUDO_FAIL_PATTERNS) + + +async def check_sudo_cached() -> bool: + """Check whether sudo credentials are currently cached (no password needed).""" + try: + proc = await asyncio.create_subprocess_exec( + "sudo", "-n", "true", + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + await proc.wait() + return proc.returncode == 0 + except Exception: + return False + + +async def cache_sudo(password: str) -> bool: + """Cache sudo credentials by validating the given password.""" + try: + proc = await asyncio.create_subprocess_exec( + "sudo", "-S", "-v", + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + await proc.communicate(input=(password + "\n").encode()) + return proc.returncode == 0 + except Exception: + return False + + +# ── High-level operations (each returns a CommandResult) ──────────────── + +# Service management ───────────────────────────────────────────────────── + +async def plex_start() -> CommandResult: + return await run_command(f"bash {_script('plex.sh')} start") + + +async def plex_stop() -> CommandResult: + return await run_command(f"bash {_script('plex.sh')} stop") + + +async def plex_restart() -> CommandResult: + return await run_command(f"bash {_script('plex.sh')} restart") + + +async def plex_status() -> CommandResult: + return await run_command(f"bash {_script('plex.sh')} status") + + +# Library scanning ─────────────────────────────────────────────────────── + +async def library_list() -> CommandResult: + return await run_command(f"bash {_script('scan-plex-libraries.sh')} list") + + +async def library_scan(section_id: str = "") -> CommandResult: + cmd = f"bash {_script('scan-plex-libraries.sh')} scan" + if section_id: + cmd += f" {shlex.quote(section_id)}" + return await run_command(cmd) + + +async def library_refresh(section_id: str = "", force: bool = False) -> CommandResult: + cmd = f"bash {_script('scan-plex-libraries.sh')} refresh" + if section_id: + cmd += f" {shlex.quote(section_id)}" + else: + cmd += ' ""' + if force: + cmd += " true" + return await run_command(cmd) + + +async def library_analyze(section_id: str = "") -> CommandResult: + cmd = f"bash {_script('scan-plex-libraries.sh')} analyze" + if section_id: + cmd += f" {shlex.quote(section_id)}" + return await run_command(cmd) + + +# Backup ───────────────────────────────────────────────────────────────── + +async def backup_run(auto_repair: bool = False) -> CommandResult: + cmd = f"bash {_script('backup-plex.sh')} --non-interactive" + if auto_repair: + cmd += " --auto-repair" + return await run_command(cmd, timeout=600) + + +async def backup_integrity_check() -> CommandResult: + return await run_command( + f"bash {_script('backup-plex.sh')} --check-integrity --non-interactive" + ) + + +async def backup_validate(latest_only: bool = True) -> CommandResult: + cmd = f"bash {_script('validate-plex-backups.sh')}" + if latest_only: + cmd += " --latest" + return await run_command(cmd) + + +async def backup_validate_report() -> CommandResult: + return await run_command(f"bash {_script('validate-plex-backups.sh')} --report") + + +async def backup_list() -> CommandResult: + return await run_command(f"bash {_script('plex.sh')} backups") + + +async def backup_builtin_status() -> CommandResult: + return await run_command(f"bash {_script('check-plex-builtin-backups.sh')}") + + +async def backup_builtin_detailed() -> CommandResult: + return await run_command(f"bash {_script('check-plex-builtin-backups.sh')} --detailed") + + +# Monitoring ────────────────────────────────────────────────────────────── + +async def monitor_dashboard() -> CommandResult: + return await run_command(f"bash {_script('monitor-plex-backup.sh')}") + + +# Database management ──────────────────────────────────────────────────── + +async def db_check() -> CommandResult: + return await run_command(f"bash {_script('plex-db-manager.sh')} check") + + +async def db_repair_gentle() -> CommandResult: + return await run_command(f"bash {_script('plex-db-manager.sh')} repair --gentle") + + +async def db_repair_force() -> CommandResult: + return await run_command(f"bash {_script('plex-db-manager.sh')} repair --force") + + +async def db_cleanup(dry_run: bool = True) -> CommandResult: + cmd = f"bash {_script('cleanup-plex-databases.sh')}" + if dry_run: + cmd += " --dry-run" + cmd += " --verbose" + return await run_command(cmd) + + +async def db_install_dbrepair() -> CommandResult: + return await run_command(f"bash {_script('plex.sh')} install-dbrepair") + + +# Recovery ──────────────────────────────────────────────────────────────── + +async def nuclear_recovery_dry_run() -> CommandResult: + return await run_command( + f"bash {_script('nuclear-plex-recovery.sh')} --dry-run" + ) + + +async def nuclear_recovery_auto() -> CommandResult: + return await run_command( + f"bash {_script('nuclear-plex-recovery.sh')} --auto", timeout=600 + ) + + +async def nuclear_recovery_verify() -> CommandResult: + return await run_command( + f"bash {_script('nuclear-plex-recovery.sh')} --verify-only" + ) + + +async def validate_recovery(mode: str = "--quick") -> CommandResult: + return await run_command( + f"bash {_script('validate-plex-recovery.sh')} {mode}" + ) + + +# Queries ───────────────────────────────────────────────────────────────── + +async def recent_additions(days: int = 7) -> CommandResult: + return await run_command( + f"bash {_script('plex-recent-additions.sh')} recent {days}" + ) + + +async def library_stats() -> CommandResult: + return await run_command(f"bash {_script('plex-recent-additions.sh')} stats") + + +async def media_counts() -> CommandResult: + return await run_command(f"bash {_script('plex-recent-additions.sh')} count") + + +async def list_libraries_query() -> CommandResult: + return await run_command(f"bash {_script('plex-recent-additions.sh')} libraries") + + +async def custom_query(sql: str) -> CommandResult: + return await run_command( + f"bash {_script('plex-recent-additions.sh')} custom {shlex.quote(sql)}" + ) + + +# Testing ───────────────────────────────────────────────────────────────── + +async def run_tests_quick() -> CommandResult: + return await run_command( + f"bash {_script('test-plex-backup.sh')} --quick", timeout=120 + ) + + +async def run_tests_unit() -> CommandResult: + return await run_command( + f"bash {_script('test-plex-backup.sh')} --unit", timeout=300 + ) + + +async def run_tests_integration() -> CommandResult: + return await run_command( + f"bash {_script('integration-test-plex.sh')} --quick", timeout=300 + ) + + +async def run_tests_cleanup() -> CommandResult: + return await run_command( + f"bash {_script('test-plex-backup.sh')} --cleanup" + ) diff --git a/plex/tui/plex_tui.tcss b/plex/tui/plex_tui.tcss new file mode 100644 index 0000000..4d366f8 --- /dev/null +++ b/plex/tui/plex_tui.tcss @@ -0,0 +1,279 @@ +/* Plex TUI Theme - Orange/Dark inspired by Plex branding */ + +Screen { + background: $surface; +} + +Header { + background: #282828; + color: #e5a00d; +} + +Footer { + background: #282828; +} + +/* Sidebar navigation */ +#sidebar { + width: 32; + background: #1a1a2e; + border-right: solid #e5a00d; + padding: 1 0; +} + +#sidebar-title { + text-align: center; + text-style: bold; + color: #e5a00d; + padding: 0 1; + margin-bottom: 1; +} + +#nav-list { + background: transparent; +} + +#nav-list > ListItem { + padding: 0 1; + height: 3; + background: transparent; +} + +#nav-list > ListItem:hover { + background: transparent; +} + +#nav-list > ListItem.-active { + background: #e5a00d 20%; +} + +.nav-label { + padding: 1 2; + width: 100%; +} + +/* Main content */ +#main-content { + padding: 1 2; +} + +/* Section headers */ +.section-header { + text-style: bold; + color: #e5a00d; + padding: 0 0 1 0; + text-align: center; +} + +/* Status panel */ +#status-panel { + height: auto; + max-height: 12; + border: solid #444; + margin-bottom: 1; + padding: 1; +} + +/* Action buttons */ +.action-btn { + margin: 0 1 1 0; + min-width: 24; +} + +.action-btn:hover { + opacity: 1.0; +} + +.action-btn-danger { + margin: 0 1 1 0; + min-width: 24; + background: $error; +} + +.action-btn-danger:hover { + background: $error; +} + +.action-btn-warning { + margin: 0 1 1 0; + min-width: 24; + background: $warning; +} + +.action-btn-warning:hover { + background: $warning; +} + +.action-btn-success { + margin: 0 1 1 0; + min-width: 24; + background: $success; +} + +.action-btn-success:hover { + background: $success; +} + +/* Suppress default button hover tint */ +Button:hover { + opacity: 1.0; +} + +/* Button rows */ +.button-row { + layout: horizontal; + height: auto; + padding: 0 0 1 0; +} + +/* Output log */ +#output-log { + border: solid #444; + height: 1fr; + margin-top: 1; +} + +/* DataTable */ +DataTable { + height: auto; + max-height: 20; + margin-bottom: 1; +} + +/* Confirmation dialog */ +ConfirmDialog { + align: center middle; +} + +ConfirmDialog > #dialog-container { + width: 60; + height: auto; + border: thick $error; + background: $surface; + padding: 1 2; +} + +ConfirmDialog #dialog-title { + text-style: bold; + color: $error; + text-align: center; + margin-bottom: 1; +} + +ConfirmDialog #dialog-body { + margin-bottom: 1; +} + +ConfirmDialog .dialog-buttons { + layout: horizontal; + align-horizontal: center; + height: auto; +} + +ConfirmDialog .dialog-buttons Button { + margin: 0 2; +} + +/* Input dialog */ +InputDialog { + align: center middle; +} + +InputDialog > #input-dialog-container { + width: 60; + height: auto; + border: thick #e5a00d; + background: $surface; + padding: 1 2; +} + +InputDialog #input-dialog-title { + text-style: bold; + color: #e5a00d; + text-align: center; + margin-bottom: 1; +} + +InputDialog #input-dialog-body { + margin-bottom: 1; +} + +InputDialog .dialog-buttons { + layout: horizontal; + align-horizontal: center; + height: auto; +} + +InputDialog .dialog-buttons Button { + margin: 0 2; +} + +/* Password dialog */ +PasswordDialog { + align: center middle; +} + +PasswordDialog > #password-dialog-container { + width: 60; + height: auto; + border: thick #e5a00d; + background: $surface; + padding: 1 2; +} + +PasswordDialog #password-dialog-title { + text-style: bold; + color: #e5a00d; + text-align: center; + margin-bottom: 1; +} + +PasswordDialog #password-dialog-body { + margin-bottom: 1; +} + +PasswordDialog .dialog-buttons { + layout: horizontal; + align-horizontal: center; + height: auto; +} + +PasswordDialog .dialog-buttons Button { + margin: 0 2; +} + +/* Tabs */ +TabbedContent { + height: 1fr; +} + +TabPane { + padding: 1; +} + +/* Info cards */ +.info-card { + background: #1a1a2e; + border: solid #444; + padding: 1; + margin-bottom: 1; + height: auto; +} + +.info-card-title { + text-style: bold; + color: #e5a00d; +} + +/* Status indicators */ +.status-ok { + color: $success; +} + +.status-error { + color: $error; +} + +.status-warning { + color: $warning; +}