feat: Update README and add requirements.txt for rich library installation instructions

This commit is contained in:
Peter Wood
2025-12-09 12:49:20 -05:00
parent 817aed748c
commit e8ce1f192f
3 changed files with 62 additions and 54 deletions

View File

@@ -18,10 +18,11 @@ A Python command-line application to manage Docker containers defined in subdire
- Python 3 - Python 3
- Docker and Docker Compose (plugin) installed. - Docker and Docker Compose (plugin) installed.
- The `rich` Python library: - Python dependencies:
```bash ```bash
pip install rich pip install -r ~/shell/docker-manager/requirements.txt
``` ```
*Or manually:* `pip install rich`
- A `~/docker/` directory containing subdirectories for each of your projects. - A `~/docker/` directory containing subdirectories for each of your projects.
- Each project subdirectory must contain a `docker-compose.yml`, `docker-compose.yaml`, `compose.yml`, or `compose.yaml` file. - Each project subdirectory must contain a `docker-compose.yml`, `docker-compose.yaml`, `compose.yml`, or `compose.yaml` file.

View File

@@ -37,11 +37,17 @@ import argparse
import subprocess import subprocess
import json import json
from pathlib import Path from pathlib import Path
from rich.console import Console
from rich.table import Table try:
from rich.panel import Panel from rich.console import Console
from rich.text import Text from rich.table import Table
from rich import box from rich.panel import Panel
from rich import box
except ImportError:
print("Error: The 'rich' library is required but not installed.")
print("Please install it using: pip install -r ~/shell/docker-manager/requirements.txt")
print("Or directly: pip install rich")
exit(1)
# Configuration # Configuration
DOCKER_ROOT = Path(os.path.expanduser("~/docker")) DOCKER_ROOT = Path(os.path.expanduser("~/docker"))
@@ -55,13 +61,13 @@ def get_diun_container_name():
res = subprocess.run(cmd, capture_output=True, text=True) res = subprocess.run(cmd, capture_output=True, text=True)
if res.returncode == 0 and res.stdout.strip(): if res.returncode == 0 and res.stdout.strip():
return res.stdout.strip().split('\n')[0] return res.stdout.strip().split('\n')[0]
# Fallback to finding by name containing 'diun' # Fallback to finding by name containing 'diun'
cmd = ["docker", "ps", "--filter", "name=diun", "--format", "{{.Names}}"] cmd = ["docker", "ps", "--filter", "name=diun", "--format", "{{.Names}}"]
res = subprocess.run(cmd, capture_output=True, text=True) res = subprocess.run(cmd, capture_output=True, text=True)
if res.returncode == 0 and res.stdout.strip(): if res.returncode == 0 and res.stdout.strip():
return res.stdout.strip().split('\n')[0] return res.stdout.strip().split('\n')[0]
return None return None
@@ -70,14 +76,14 @@ def get_diun_info():
diun_info = {} diun_info = {}
try: try:
diun_container = get_diun_container_name() diun_container = get_diun_container_name()
if not diun_container: if not diun_container:
return diun_info return diun_info
# Get diun image list # Get diun image list
cmd = ["docker", "exec", diun_container, "diun", "image", "list", "--raw"] cmd = ["docker", "exec", diun_container, "diun", "image", "list", "--raw"]
res = subprocess.run(cmd, capture_output=True, text=True) res = subprocess.run(cmd, capture_output=True, text=True)
if res.returncode != 0: if res.returncode != 0:
return diun_info return diun_info
@@ -87,31 +93,31 @@ def get_diun_info():
latest = img.get("latest", {}) latest = img.get("latest", {})
digest = latest.get("digest", "") digest = latest.get("digest", "")
labels = latest.get("labels", {}) labels = latest.get("labels", {})
# Normalize name for matching # Normalize name for matching
# Remove docker.io/ prefix # Remove docker.io/ prefix
norm_name = name norm_name = name
if norm_name.startswith("docker.io/"): if norm_name.startswith("docker.io/"):
norm_name = norm_name[10:] norm_name = norm_name[10:]
# If no registry (no /), prepend library/ # If no registry (no /), prepend library/
if "/" not in norm_name: if "/" not in norm_name:
norm_name = f"library/{norm_name}" norm_name = f"library/{norm_name}"
tag = latest.get("tag", "latest") tag = latest.get("tag", "latest")
if norm_name not in diun_info: if norm_name not in diun_info:
diun_info[norm_name] = {} diun_info[norm_name] = {}
diun_info[norm_name][tag] = { diun_info[norm_name][tag] = {
"digest": digest, "digest": digest,
"labels": labels, "labels": labels,
"original_name": name "original_name": name
} }
except (subprocess.CalledProcessError, json.JSONDecodeError): except (subprocess.CalledProcessError, json.JSONDecodeError):
pass pass
return diun_info return diun_info
@@ -194,20 +200,20 @@ def list_containers(projects):
c_name = container.get('Name', '') c_name = container.get('Name', '')
state = container.get('State', '') state = container.get('State', '')
image = container.get('Image', '') image = container.get('Image', '')
if not c_name: if not c_name:
continue continue
found_any = True found_any = True
# Get version and update status # Get version and update status
version = "unknown" version = "unknown"
update_status = "" update_status = ""
# Normalize image name for lookup # Normalize image name for lookup
norm_image = image norm_image = image
tag = "latest" tag = "latest"
# Remove tag if present (heuristic: split on last colon, if right side has no slashes) # Remove tag if present (heuristic: split on last colon, if right side has no slashes)
if ":" in norm_image: if ":" in norm_image:
base, sep, t = norm_image.rpartition(":") base, sep, t = norm_image.rpartition(":")
@@ -219,37 +225,37 @@ def list_containers(projects):
norm_image = norm_image[10:] norm_image = norm_image[10:]
if "/" not in norm_image: if "/" not in norm_image:
norm_image = f"library/{norm_image}" norm_image = f"library/{norm_image}"
# Check Diun info # Check Diun info
if diun_info: if diun_info:
if norm_image in diun_info: if norm_image in diun_info:
image_tags = diun_info[norm_image] image_tags = diun_info[norm_image]
# Only proceed if we have info for this specific tag # Only proceed if we have info for this specific tag
if tag in image_tags: if tag in image_tags:
info = image_tags[tag] info = image_tags[tag]
# Try to get version from Diun labels first # Try to get version from Diun labels first
version = get_image_version(info.get("labels", {})) version = get_image_version(info.get("labels", {}))
# Check for updates # Check for updates
# First get the Image ID of the running container # First get the Image ID of the running container
inspect_id_cmd = ["docker", "inspect", c_name, "--format", "{{.Image}}"] inspect_id_cmd = ["docker", "inspect", c_name, "--format", "{{.Image}}"]
inspect_id_res = run_command(inspect_id_cmd, path, capture_output=True) inspect_id_res = run_command(inspect_id_cmd, path, capture_output=True)
if inspect_id_res and inspect_id_res.returncode == 0: if inspect_id_res and inspect_id_res.returncode == 0:
image_id = inspect_id_res.stdout.strip() image_id = inspect_id_res.stdout.strip()
# Now inspect the Image ID to get RepoDigests # Now inspect the Image ID to get RepoDigests
inspect_digest_cmd = ["docker", "inspect", image_id, "--format", "{{if .RepoDigests}}{{index .RepoDigests 0}}{{end}}"] inspect_digest_cmd = ["docker", "inspect", image_id, "--format", "{{if .RepoDigests}}{{index .RepoDigests 0}}{{end}}"]
inspect_digest_res = run_command(inspect_digest_cmd, path, capture_output=True) inspect_digest_res = run_command(inspect_digest_cmd, path, capture_output=True)
if inspect_digest_res and inspect_digest_res.returncode == 0: if inspect_digest_res and inspect_digest_res.returncode == 0:
running_digest_full = inspect_digest_res.stdout.strip() running_digest_full = inspect_digest_res.stdout.strip()
# running_digest is like name@sha256:hash # running_digest is like name@sha256:hash
if "@" in running_digest_full: if "@" in running_digest_full:
running_digest = running_digest_full.split("@")[1] running_digest = running_digest_full.split("@")[1]
latest_digest = info.get("digest", "") latest_digest = info.get("digest", "")
if latest_digest: if latest_digest:
if running_digest != latest_digest: if running_digest != latest_digest:
update_status = "[bold red]Update Available[/bold red]" update_status = "[bold red]Update Available[/bold red]"
@@ -259,17 +265,17 @@ def list_containers(projects):
update_status = "[dim]Tag Not Monitored[/dim]" update_status = "[dim]Tag Not Monitored[/dim]"
else: else:
update_status = "[dim]Not Monitored[/dim]" update_status = "[dim]Not Monitored[/dim]"
# If version is still unknown, try to get from running container labels # If version is still unknown, try to get from running container labels
if version == "latest" or version == "unknown": if version in ("latest", "unknown"):
inspect_cmd = ["docker", "inspect", c_name, "--format", "{{json .Config.Labels}}"] inspect_cmd = ["docker", "inspect", c_name, "--format", "{{json .Config.Labels}}"]
inspect_res = run_command(inspect_cmd, path, capture_output=True) inspect_res = run_command(inspect_cmd, path, capture_output=True)
if inspect_res and inspect_res.returncode == 0: if inspect_res and inspect_res.returncode == 0:
try: try:
labels = json.loads(inspect_res.stdout) labels = json.loads(inspect_res.stdout)
version = get_image_version(labels) version = get_image_version(labels)
except: except Exception:
pass pass
table.add_row(name, c_name, state, image, version, update_status) table.add_row(name, c_name, state, image, version, update_status)
@@ -316,12 +322,12 @@ def describe_project(projects, target):
container = json.loads(line) container = json.loads(line)
found_any = True found_any = True
name = container.get('Name', 'N/A') name = container.get('Name', 'N/A')
# Create a table for this container # Create a table for this container
table = Table(show_header=False, box=box.SIMPLE) table = Table(show_header=False, box=box.SIMPLE)
table.add_column("Field", style="bold cyan") table.add_column("Field", style="bold cyan")
table.add_column("Value") table.add_column("Value")
table.add_row("Service", container.get('Service', 'N/A')) table.add_row("Service", container.get('Service', 'N/A'))
table.add_row("Container", name) table.add_row("Container", name)
table.add_row("Image", container.get('Image', 'N/A')) table.add_row("Image", container.get('Image', 'N/A'))
@@ -338,7 +344,7 @@ def describe_project(projects, target):
if inspect_res and inspect_res.returncode == 0: if inspect_res and inspect_res.returncode == 0:
try: try:
details = json.loads(inspect_res.stdout.strip()) details = json.loads(inspect_res.stdout.strip())
# Description # Description
labels = details.get("Config", {}).get("Labels", {}) labels = details.get("Config", {}).get("Labels", {})
description = labels.get("org.opencontainers.image.description") description = labels.get("org.opencontainers.image.description")
@@ -354,24 +360,24 @@ def describe_project(projects, target):
v_table.add_column("Type", style="cyan") v_table.add_column("Type", style="cyan")
v_table.add_column("Source", style="yellow") v_table.add_column("Source", style="yellow")
v_table.add_column("Destination", style="green") v_table.add_column("Destination", style="green")
for mount in mounts: for mount in mounts:
m_type = mount.get("Type") m_type = mount.get("Type")
source = mount.get("Source", "") source = mount.get("Source", "")
dest = mount.get("Destination", "") dest = mount.get("Destination", "")
v_table.add_row(m_type, source, dest) v_table.add_row(m_type, source, dest)
console.print(v_table) console.print(v_table)
console.print("") # Spacing console.print("") # Spacing
except json.JSONDecodeError: except json.JSONDecodeError:
pass pass
else: else:
console.print(Panel(table, title=f"[bold]{name}[/bold]", border_style="green")) console.print(Panel(table, title=f"[bold]{name}[/bold]", border_style="green"))
except json.JSONDecodeError: except json.JSONDecodeError:
pass pass
if not found_any: if not found_any:
console.print("[yellow]No containers found for this project.[/yellow]") console.print("[yellow]No containers found for this project.[/yellow]")
else: else:
@@ -385,7 +391,7 @@ def list_project_volumes(projects, target):
return return
path = projects[target] path = projects[target]
table = Table(title=f"Volumes for project: {target}", box=box.ROUNDED) table = Table(title=f"Volumes for project: {target}", box=box.ROUNDED)
table.add_column("Service", style="cyan") table.add_column("Service", style="cyan")
table.add_column("Container", style="blue") table.add_column("Container", style="blue")
@@ -405,10 +411,10 @@ def list_project_volumes(projects, target):
container = json.loads(line) container = json.loads(line)
name = container.get('Name', 'N/A') name = container.get('Name', 'N/A')
service = container.get('Service', 'N/A') service = container.get('Service', 'N/A')
inspect_cmd = ["docker", "inspect", name, "--format", "{{json .Mounts}}"] inspect_cmd = ["docker", "inspect", name, "--format", "{{json .Mounts}}"]
inspect_res = run_command(inspect_cmd, path, capture_output=True) inspect_res = run_command(inspect_cmd, path, capture_output=True)
if inspect_res and inspect_res.returncode == 0: if inspect_res and inspect_res.returncode == 0:
try: try:
mounts = json.loads(inspect_res.stdout.strip()) mounts = json.loads(inspect_res.stdout.strip())
@@ -422,7 +428,7 @@ def list_project_volumes(projects, target):
pass pass
except json.JSONDecodeError: except json.JSONDecodeError:
pass pass
console.print(table) console.print(table)
@@ -456,7 +462,7 @@ def manage_project(projects, action, target, extra_args=None, container_filter=N
console.print("[red]Error: Logs can only be viewed for one project at a time.[/red]") console.print("[red]Error: Logs can only be viewed for one project at a time.[/red]")
return return
name, path = targets[0] name, path = targets[0]
# Filter by container/service if specified # Filter by container/service if specified
services_to_log = [] services_to_log = []
if container_filter: if container_filter:
@@ -475,7 +481,7 @@ def manage_project(projects, action, target, extra_args=None, container_filter=N
services_to_log.append(s_name) services_to_log.append(s_name)
except json.JSONDecodeError: except json.JSONDecodeError:
pass pass
if not services_to_log: if not services_to_log:
console.print(f"[yellow]No matching service/container found for '{container_filter}'. Showing all logs.[/yellow]") console.print(f"[yellow]No matching service/container found for '{container_filter}'. Showing all logs.[/yellow]")
else: else:
@@ -484,7 +490,7 @@ def manage_project(projects, action, target, extra_args=None, container_filter=N
console.print(f"[blue]Viewing logs for {name}...[/blue]") console.print(f"[blue]Viewing logs for {name}...[/blue]")
cmd = ["docker", "compose", "logs"] + (extra_args or []) cmd = ["docker", "compose", "logs"] + (extra_args or [])
if services_to_log: if services_to_log:
cmd.extend(services_to_log) cmd.extend(services_to_log)

View File

@@ -0,0 +1 @@
rich>=13.0.0