mirror of
https://github.com/acedanger/shell.git
synced 2026-03-24 19:11:48 -07:00
346 lines
11 KiB
Python
346 lines
11 KiB
Python
"""Plex Management TUI — backend helpers for running shell scripts."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import shlex
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
|
|
SCRIPT_DIR = Path(__file__).resolve().parent.parent # /home/…/shell/plex
|
|
|
|
|
|
@dataclass
|
|
class CommandResult:
|
|
returncode: int
|
|
stdout: str
|
|
stderr: str
|
|
command: str
|
|
|
|
@property
|
|
def ok(self) -> bool:
|
|
return self.returncode == 0
|
|
|
|
@property
|
|
def output(self) -> str:
|
|
text = self.stdout
|
|
if self.stderr:
|
|
text += "\n" + self.stderr
|
|
return text.strip()
|
|
|
|
|
|
def _script(name: str) -> str:
|
|
"""Return absolute path to a script in the plex directory."""
|
|
return str(SCRIPT_DIR / name)
|
|
|
|
|
|
# ── Ansi stripping ──────────────────────────────────────────────────────
|
|
|
|
import re
|
|
|
|
_ANSI_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
|
|
|
|
|
|
def strip_ansi(text: str) -> str:
|
|
return _ANSI_RE.sub("", text)
|
|
|
|
|
|
# ── Async command runner ────────────────────────────────────────────────
|
|
|
|
async def run_command(
|
|
cmd: str | list[str],
|
|
*,
|
|
sudo: bool = False,
|
|
timeout: int = 300,
|
|
) -> CommandResult:
|
|
"""Run a shell command asynchronously and return the result."""
|
|
if isinstance(cmd, list):
|
|
shell_cmd = " ".join(shlex.quote(c) for c in cmd)
|
|
else:
|
|
shell_cmd = cmd
|
|
|
|
if sudo:
|
|
shell_cmd = f"sudo {shell_cmd}"
|
|
|
|
env = os.environ.copy()
|
|
env["TERM"] = "dumb" # suppress colour in child scripts
|
|
|
|
try:
|
|
proc = await asyncio.create_subprocess_shell(
|
|
shell_cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
stdin=asyncio.subprocess.DEVNULL,
|
|
env=env,
|
|
)
|
|
stdout_bytes, stderr_bytes = await asyncio.wait_for(
|
|
proc.communicate(), timeout=timeout
|
|
)
|
|
return CommandResult(
|
|
returncode=proc.returncode or 0,
|
|
stdout=strip_ansi(stdout_bytes.decode(errors="replace")),
|
|
stderr=strip_ansi(stderr_bytes.decode(errors="replace")),
|
|
command=shell_cmd,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
return CommandResult(
|
|
returncode=-1,
|
|
stdout="",
|
|
stderr=f"Command timed out after {timeout}s",
|
|
command=shell_cmd,
|
|
)
|
|
except Exception as exc:
|
|
return CommandResult(
|
|
returncode=-1,
|
|
stdout="",
|
|
stderr=str(exc),
|
|
command=shell_cmd,
|
|
)
|
|
|
|
|
|
# ── Sudo helpers ────────────────────────────────────────────────────────
|
|
|
|
_SUDO_FAIL_PATTERNS = [
|
|
"sudo: a password is required",
|
|
"sudo: a terminal is required",
|
|
"sudo: no tty present",
|
|
]
|
|
|
|
|
|
def is_sudo_failure(result: CommandResult) -> bool:
|
|
"""Return True if the command failed because of missing sudo credentials."""
|
|
if result.ok:
|
|
return False
|
|
text = (result.stdout + " " + result.stderr).lower()
|
|
return any(p in text for p in _SUDO_FAIL_PATTERNS)
|
|
|
|
|
|
async def check_sudo_cached() -> bool:
|
|
"""Check whether sudo credentials are currently cached (no password needed)."""
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"sudo", "-n", "true",
|
|
stdin=asyncio.subprocess.DEVNULL,
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL,
|
|
)
|
|
await proc.wait()
|
|
return proc.returncode == 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def cache_sudo(password: str) -> bool:
|
|
"""Cache sudo credentials by validating the given password."""
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"sudo", "-S", "-v",
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.DEVNULL,
|
|
)
|
|
await proc.communicate(input=(password + "\n").encode())
|
|
return proc.returncode == 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# ── High-level operations (each returns a CommandResult) ────────────────
|
|
|
|
# Service management ─────────────────────────────────────────────────────
|
|
|
|
async def plex_start() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex.sh')} start")
|
|
|
|
|
|
async def plex_stop() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex.sh')} stop")
|
|
|
|
|
|
async def plex_restart() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex.sh')} restart")
|
|
|
|
|
|
async def plex_status() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex.sh')} status")
|
|
|
|
|
|
# Library scanning ───────────────────────────────────────────────────────
|
|
|
|
async def library_list() -> CommandResult:
|
|
return await run_command(f"bash {_script('scan-plex-libraries.sh')} list")
|
|
|
|
|
|
async def library_scan(section_id: str = "") -> CommandResult:
|
|
cmd = f"bash {_script('scan-plex-libraries.sh')} scan"
|
|
if section_id:
|
|
cmd += f" {shlex.quote(section_id)}"
|
|
return await run_command(cmd)
|
|
|
|
|
|
async def library_refresh(section_id: str = "", force: bool = False) -> CommandResult:
|
|
cmd = f"bash {_script('scan-plex-libraries.sh')} refresh"
|
|
if section_id:
|
|
cmd += f" {shlex.quote(section_id)}"
|
|
else:
|
|
cmd += ' ""'
|
|
if force:
|
|
cmd += " true"
|
|
return await run_command(cmd)
|
|
|
|
|
|
async def library_analyze(section_id: str = "") -> CommandResult:
|
|
cmd = f"bash {_script('scan-plex-libraries.sh')} analyze"
|
|
if section_id:
|
|
cmd += f" {shlex.quote(section_id)}"
|
|
return await run_command(cmd)
|
|
|
|
|
|
# Backup ─────────────────────────────────────────────────────────────────
|
|
|
|
async def backup_run(auto_repair: bool = False) -> CommandResult:
|
|
cmd = f"bash {_script('backup-plex.sh')} --non-interactive"
|
|
if auto_repair:
|
|
cmd += " --auto-repair"
|
|
return await run_command(cmd, timeout=600)
|
|
|
|
|
|
async def backup_integrity_check() -> CommandResult:
|
|
return await run_command(
|
|
f"bash {_script('backup-plex.sh')} --check-integrity --non-interactive"
|
|
)
|
|
|
|
|
|
async def backup_validate(latest_only: bool = True) -> CommandResult:
|
|
cmd = f"bash {_script('validate-plex-backups.sh')}"
|
|
if latest_only:
|
|
cmd += " --latest"
|
|
return await run_command(cmd)
|
|
|
|
|
|
async def backup_validate_report() -> CommandResult:
|
|
return await run_command(f"bash {_script('validate-plex-backups.sh')} --report")
|
|
|
|
|
|
async def backup_list() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex.sh')} backups")
|
|
|
|
|
|
async def backup_builtin_status() -> CommandResult:
|
|
return await run_command(f"bash {_script('check-plex-builtin-backups.sh')}")
|
|
|
|
|
|
async def backup_builtin_detailed() -> CommandResult:
|
|
return await run_command(f"bash {_script('check-plex-builtin-backups.sh')} --detailed")
|
|
|
|
|
|
# Monitoring ──────────────────────────────────────────────────────────────
|
|
|
|
async def monitor_dashboard() -> CommandResult:
|
|
return await run_command(f"bash {_script('monitor-plex-backup.sh')}")
|
|
|
|
|
|
# Database management ────────────────────────────────────────────────────
|
|
|
|
async def db_check() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex-db-manager.sh')} check")
|
|
|
|
|
|
async def db_repair_gentle() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex-db-manager.sh')} repair --gentle")
|
|
|
|
|
|
async def db_repair_force() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex-db-manager.sh')} repair --force")
|
|
|
|
|
|
async def db_cleanup(dry_run: bool = True) -> CommandResult:
|
|
cmd = f"bash {_script('cleanup-plex-databases.sh')}"
|
|
if dry_run:
|
|
cmd += " --dry-run"
|
|
cmd += " --verbose"
|
|
return await run_command(cmd)
|
|
|
|
|
|
async def db_install_dbrepair() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex.sh')} install-dbrepair")
|
|
|
|
|
|
# Recovery ────────────────────────────────────────────────────────────────
|
|
|
|
async def nuclear_recovery_dry_run() -> CommandResult:
|
|
return await run_command(
|
|
f"bash {_script('nuclear-plex-recovery.sh')} --dry-run"
|
|
)
|
|
|
|
|
|
async def nuclear_recovery_auto() -> CommandResult:
|
|
return await run_command(
|
|
f"bash {_script('nuclear-plex-recovery.sh')} --auto", timeout=600
|
|
)
|
|
|
|
|
|
async def nuclear_recovery_verify() -> CommandResult:
|
|
return await run_command(
|
|
f"bash {_script('nuclear-plex-recovery.sh')} --verify-only"
|
|
)
|
|
|
|
|
|
async def validate_recovery(mode: str = "--quick") -> CommandResult:
|
|
return await run_command(
|
|
f"bash {_script('validate-plex-recovery.sh')} {mode}"
|
|
)
|
|
|
|
|
|
# Queries ─────────────────────────────────────────────────────────────────
|
|
|
|
async def recent_additions(days: int = 7) -> CommandResult:
|
|
return await run_command(
|
|
f"bash {_script('plex-recent-additions.sh')} recent {days}"
|
|
)
|
|
|
|
|
|
async def library_stats() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex-recent-additions.sh')} stats")
|
|
|
|
|
|
async def media_counts() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex-recent-additions.sh')} count")
|
|
|
|
|
|
async def list_libraries_query() -> CommandResult:
|
|
return await run_command(f"bash {_script('plex-recent-additions.sh')} libraries")
|
|
|
|
|
|
async def custom_query(sql: str) -> CommandResult:
|
|
return await run_command(
|
|
f"bash {_script('plex-recent-additions.sh')} custom {shlex.quote(sql)}"
|
|
)
|
|
|
|
|
|
# Testing ─────────────────────────────────────────────────────────────────
|
|
|
|
async def run_tests_quick() -> CommandResult:
|
|
return await run_command(
|
|
f"bash {_script('test-plex-backup.sh')} --quick", timeout=120
|
|
)
|
|
|
|
|
|
async def run_tests_unit() -> CommandResult:
|
|
return await run_command(
|
|
f"bash {_script('test-plex-backup.sh')} --unit", timeout=300
|
|
)
|
|
|
|
|
|
async def run_tests_integration() -> CommandResult:
|
|
return await run_command(
|
|
f"bash {_script('integration-test-plex.sh')} --quick", timeout=300
|
|
)
|
|
|
|
|
|
async def run_tests_cleanup() -> CommandResult:
|
|
return await run_command(
|
|
f"bash {_script('test-plex-backup.sh')} --cleanup"
|
|
)
|