Files
shell/backup-web-app.py

524 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Backup Web Application
A Flask-based web interface for monitoring and managing backup files.
Integrates with the backup metrics JSON generator to provide:
- Real-time backup status monitoring
- Log file viewing
- Backup file downloads
- Service health dashboard
Author: Shell Repository
"""
import os
import json
import logging
from datetime import datetime
from flask import Flask, render_template, jsonify, request, abort
from werkzeug.utils import secure_filename
import subprocess
# Configuration
BACKUP_ROOT = os.environ.get('BACKUP_ROOT', '/mnt/share/media/backups')
METRICS_DIR = os.path.join(BACKUP_ROOT, 'metrics')
LOG_FILE = '/tmp/backup-web-app.log'
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Flask app setup
app = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max
def load_json_file(filepath):
"""Safely load JSON file with error handling"""
try:
if os.path.exists(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
except (OSError, json.JSONDecodeError, UnicodeDecodeError) as e:
logger.error("Error loading JSON file %s: %s", filepath, e)
return None
def get_services():
"""Get list of available backup services"""
services = []
if os.path.exists(BACKUP_ROOT):
for item in os.listdir(BACKUP_ROOT):
service_path = os.path.join(BACKUP_ROOT, item)
if os.path.isdir(service_path) and item != 'metrics':
services.append(item)
return sorted(services)
def get_service_metrics(service_name):
"""Get metrics for a specific service"""
# Simple status file approach
status_file = os.path.join(METRICS_DIR, f'{service_name}_status.json')
status = load_json_file(status_file)
return {
'status': status,
'last_run': status.get('end_time') if status else None,
'current_status': status.get('status', 'unknown') if status else 'never_run',
'files_processed': status.get('files_processed', 0) if status else 0,
'total_size': status.get('total_size_bytes', 0) if status else 0,
'duration': status.get('duration_seconds', 0) if status else 0
}
def get_consolidated_metrics():
"""Get consolidated metrics across all services"""
# With simplified approach, we consolidate by reading all status files
services = {}
if os.path.exists(METRICS_DIR):
for filename in os.listdir(METRICS_DIR):
if filename.endswith('_status.json'):
service_name = filename.replace('_status.json', '')
status_file = os.path.join(METRICS_DIR, filename)
status = load_json_file(status_file)
if status:
services[service_name] = status
return {
'services': services,
'total_services': len(services),
'last_updated': datetime.now().isoformat()
}
def get_log_files(service_name=None):
"""Get available log files for a service or all services"""
log_files = []
# Check centralized logs directory first
shell_logs_dir = '/home/acedanger/shell/logs'
if os.path.exists(shell_logs_dir):
for item in os.listdir(shell_logs_dir):
if item.endswith('.log'):
log_path = os.path.join(shell_logs_dir, item)
if os.path.isfile(log_path):
# Try to determine service from filename
service_from_filename = 'general'
item_lower = item.lower()
if 'docker' in item_lower:
service_from_filename = 'docker'
elif 'media' in item_lower:
service_from_filename = 'media-services'
elif 'plex' in item_lower:
service_from_filename = 'plex'
elif 'immich' in item_lower:
service_from_filename = 'immich'
elif 'backup-metrics' in item_lower:
# Backup metrics logs are relevant to all services
service_from_filename = 'general'
# If filtering by service, include logs that match or are general
if (service_name is None or
service_from_filename == service_name or
service_from_filename == 'general' or
service_name in item_lower):
log_files.append({
'name': item,
'path': log_path,
'service': service_from_filename,
'size': os.path.getsize(log_path),
'modified': datetime.fromtimestamp(os.path.getmtime(log_path))
})
if service_name:
# Also check service-specific directories in BACKUP_ROOT
service_path = os.path.join(BACKUP_ROOT, service_name)
if os.path.exists(service_path):
for item in os.listdir(service_path):
if item.endswith('.log'):
log_path = os.path.join(service_path, item)
if os.path.isfile(log_path):
# Avoid duplicates
if not any(existing['path'] == log_path for existing in log_files):
log_files.append({
'name': item,
'path': log_path,
'service': service_name,
'size': os.path.getsize(log_path),
'modified': datetime.fromtimestamp(os.path.getmtime(log_path))
})
elif service_name is None:
# When getting all logs, also check service directories
for service in get_services():
service_logs = get_log_files(service)
# Avoid duplicates by checking if we already have this log file
for log in service_logs:
if not any(existing['path'] == log['path'] for existing in log_files):
log_files.append(log)
return sorted(log_files, key=lambda x: x['modified'], reverse=True)
def get_backup_files(service_name):
"""Get backup files for a service"""
backup_files = []
service_path = os.path.join(BACKUP_ROOT, service_name)
# Check both direct path and scheduled subdirectory
paths_to_check = [service_path]
scheduled_path = os.path.join(service_path, 'scheduled')
if os.path.exists(scheduled_path):
paths_to_check.append(scheduled_path)
for path in paths_to_check:
if os.path.exists(path):
for item in os.listdir(path):
item_path = os.path.join(path, item)
if os.path.isfile(item_path) and not item.endswith('.log'):
backup_files.append({
'name': item,
'path': item_path,
'relative_path': os.path.relpath(item_path, BACKUP_ROOT),
'size': os.path.getsize(item_path),
'modified': datetime.fromtimestamp(os.path.getmtime(item_path)),
'is_scheduled': 'scheduled' in path
})
return sorted(backup_files, key=lambda x: x['modified'], reverse=True)
@app.route('/')
def index():
"""Main dashboard"""
try:
# Get all services with their metrics
services_data = []
# Status counters for summary
successful = 0
partial = 0
failed = 0
# Build service data from status files
if os.path.exists(METRICS_DIR):
for filename in os.listdir(METRICS_DIR):
if filename.endswith('_status.json'):
service_name = filename.replace('_status.json', '')
status_file = os.path.join(METRICS_DIR, filename)
status = load_json_file(status_file)
if status:
# Count statuses for summary
if status.get('status') == 'success':
successful += 1
elif status.get('status') == 'partial':
partial += 1
elif status.get('status') == 'failed':
failed += 1
# Add backup path information
service_backup_path = os.path.join(
BACKUP_ROOT, service_name)
if os.path.exists(service_backup_path):
status['backup_path'] = service_backup_path
# Add service data
services_data.append(status)
# Create summary
total = len(services_data)
summary = {
'successful': successful,
'partial': partial,
'failed': failed,
'total': total
}
# Get recent activity
recent_logs = get_log_files()[:10] # Last 10 log entries
dashboard_data = {
'services': services_data,
'summary': summary,
'recent_logs': recent_logs,
'last_updated': datetime.now().isoformat()
}
return render_template('dashboard.html', data=dashboard_data)
except (OSError, IOError, json.JSONDecodeError) as e:
logger.error("Error in index route: %s", e)
return f"Error: {e}", 500
@app.route('/api/services')
def api_services():
"""API endpoint for services list"""
return jsonify(get_services())
@app.route('/api/service/<service_name>')
def api_service_details(service_name):
"""API endpoint for service details"""
try:
service_name = secure_filename(service_name)
metrics = get_service_metrics(service_name)
backup_files = get_backup_files(service_name)
log_files = get_log_files(service_name)
return jsonify({
'service': service_name,
'metrics': metrics,
'backup_files': backup_files,
'log_files': log_files
})
except (OSError, IOError, json.JSONDecodeError) as e:
logger.error("Error getting service details for %s: %s",
service_name, e)
return jsonify({'error': str(e)}), 500
@app.route('/api/metrics/consolidated')
def api_consolidated_metrics():
"""API endpoint for consolidated metrics"""
return jsonify(get_consolidated_metrics())
@app.route('/service/<service_name>')
def service_detail(service_name):
"""Service detail page"""
try:
service_name = secure_filename(service_name)
# Get the service status from metrics file
status_file = os.path.join(METRICS_DIR, f'{service_name}_status.json')
service_data = load_json_file(status_file)
if not service_data:
# Create basic service data if no metrics file exists
service_data = {
'service': service_name,
'description': f'{service_name.title()} service',
'status': 'unknown',
'message': 'No metrics available'
}
# Add backup path information
service_backup_path = os.path.join(BACKUP_ROOT, service_name)
if os.path.exists(service_backup_path):
service_data['backup_path'] = service_backup_path
# Find latest backup file
backup_files = get_backup_files(service_name)
if backup_files:
# Already sorted by modification time
latest_backup = backup_files[0]
service_data['latest_backup'] = latest_backup['path']
return render_template('service.html', service=service_data)
except (OSError, IOError, json.JSONDecodeError) as e:
logger.error("Error in service detail for %s: %s", service_name, e)
return f"Error: {e}", 500
@app.route('/logs')
def logs_view():
"""Logs viewer page"""
try:
service_filter = request.args.get('service')
log_files = get_log_files(service_filter)
# Format log data for template
formatted_logs = []
for log in log_files:
# Format file size
size_bytes = log['size']
if size_bytes < 1024:
size_formatted = f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
size_formatted = f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
size_formatted = f"{size_bytes / (1024 * 1024):.1f} MB"
else:
size_formatted = f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"
# Format modification time
modified_time = log['modified'].strftime("%Y-%m-%d %H:%M:%S")
formatted_logs.append({
'name': log['name'],
'filename': log['name'], # For backward compatibility
'path': log['path'],
'service': log['service'],
'size': log['size'],
'size_formatted': size_formatted,
'modified': log['modified'],
'modified_time': modified_time
})
return render_template('logs.html', logs=formatted_logs, filter_service=service_filter)
except (OSError, IOError) as e:
logger.error("Error in logs view: %s", e)
return f"Error: {e}", 500
@app.route('/log/<filename>')
def view_log(filename):
"""View log file content"""
try:
# Security: ensure the filename is safe
filename = secure_filename(filename)
# Look for the log file in centralized logs directory first
log_path = None
centralized_logs = '/home/acedanger/shell/logs'
potential_path = os.path.join(centralized_logs, filename)
if os.path.exists(potential_path):
log_path = potential_path
# If not found, look in service directories
if not log_path:
for service in get_services():
potential_path = os.path.join(BACKUP_ROOT, service, filename)
if os.path.exists(potential_path):
log_path = potential_path
break
# Also check the logs directory in BACKUP_ROOT if it exists
if not log_path:
potential_path = os.path.join(BACKUP_ROOT, 'logs', filename)
if os.path.exists(potential_path):
log_path = potential_path
if not log_path:
abort(404)
# Read last N lines for large files
max_lines = int(request.args.get('lines', 1000))
with open(log_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
if len(lines) > max_lines:
lines = lines[-max_lines:]
content = ''.join(lines)
# Get file info
file_size = os.path.getsize(log_path)
last_modified = datetime.fromtimestamp(os.path.getmtime(log_path))
return render_template('log_viewer.html',
filename=filename,
content=content,
file_size=f"{file_size:,} bytes",
last_modified=last_modified.strftime(
"%Y-%m-%d %H:%M:%S"),
total_lines=len(lines),
lines_shown=min(len(lines), max_lines))
except (OSError, IOError, UnicodeDecodeError, ValueError) as e:
logger.error("Error viewing log %s: %s", filename, e)
return f"Error: {e}", 500
@app.route('/api/refresh-metrics')
def api_refresh_metrics():
"""Trigger metrics refresh"""
try:
# Run the backup metrics generator
script_path = os.path.join(os.path.dirname(
__file__), 'generate-backup-metrics.sh')
if os.path.exists(script_path):
env = os.environ.copy()
env['BACKUP_ROOT'] = BACKUP_ROOT
result = subprocess.run(
[script_path],
env=env,
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
check=False
)
if result.returncode == 0:
logger.info("Metrics refresh completed successfully")
return jsonify({
'status': 'success',
'message': 'Metrics refreshed successfully',
'output': result.stdout
})
else:
logger.error("Metrics refresh failed: %s", result.stderr)
return jsonify({
'status': 'error',
'message': 'Metrics refresh failed',
'error': result.stderr
}), 500
else:
return jsonify({
'status': 'error',
'message': 'Metrics generator script not found'
}), 404
except subprocess.TimeoutExpired:
return jsonify({
'status': 'error',
'message': 'Metrics refresh timed out'
}), 408
except (OSError, subprocess.SubprocessError) as e:
logger.error("Error refreshing metrics: %s", e)
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/health')
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'backup_root': BACKUP_ROOT,
'metrics_dir': METRICS_DIR,
'services_count': len(get_services())
})
@app.errorhandler(404)
def not_found(_error):
return render_template('error.html',
error_code=404,
error_message="Page not found"), 404
@app.errorhandler(500)
def internal_error(_error):
return render_template('error.html',
error_code=500,
error_message="Internal server error"), 500
if __name__ == '__main__':
# Ensure metrics directory exists
os.makedirs(METRICS_DIR, exist_ok=True)
# Development server settings
app.run(
host='0.0.0.0',
port=int(os.environ.get('PORT', 5000)),
debug=os.environ.get('FLASK_DEBUG', 'False').lower() == 'true'
)