Files
PiCopy/picopy/system.py

223 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""PiCopy Systeminfo, Format-Drives, Update-System."""
import os
import shutil
import subprocess
import threading
import time
from datetime import datetime
from pathlib import Path
import urllib.request as _urlreq
from picopy.config import BASE_DIR, RAW_BASE, VERSION, _atomic_write, log
from picopy.state import copy_state
update_state = {
'current': VERSION,
'latest': None,
'available': False,
'checking': False,
'error': None,
'last_checked': None,
}
update_lock = threading.Lock()
format_state = {'running': False, 'error': None, 'done': False, 'fs': '', 'device': ''}
FORMAT_FILESYSTEMS = {
'exfat': {
'label': 'exFAT',
'desc': 'Empfohlen Mac & Windows, keine 4-GB-Dateigrößenbeschränkung',
'cmd': lambda dev, name: ['mkfs.exfat', '-n', name, dev],
'pkg': 'exfatprogs',
},
'fat32': {
'label': 'FAT32',
'desc': 'Mac & Windows, max. 4 GB pro Datei',
'cmd': lambda dev, name: ['mkfs.vfat', '-F', '32', '-n', name[:11], dev],
'pkg': 'dosfstools',
},
'ntfs': {
'label': 'NTFS',
'desc': 'Windows nativ, Mac nur lesen',
'cmd': lambda dev, name: ['mkfs.ntfs', '-f', '-L', name[:32], dev],
'pkg': 'ntfs-3g',
},
}
# Liste aller Dateien die beim Update heruntergeladen werden müssen
UPDATE_FILES = [
'app.py',
'version.txt',
'PiCopy_Logo.png',
'picopy/__init__.py',
'picopy/config.py',
'picopy/state.py',
'picopy/usb.py',
'picopy/copy_engine.py',
'picopy/wifi.py',
'picopy/wireguard.py',
'picopy/samba.py',
'picopy/upload.py',
'picopy/system.py',
'routes/__init__.py',
'routes/copy_routes.py',
'routes/wifi_routes.py',
'routes/wireguard_routes.py',
'routes/upload_routes.py',
'routes/system_routes.py',
'routes/browse_routes.py',
'templates/index.html',
]
def get_sysinfo() -> dict:
info: dict = {}
# CPU-Temperatur (Raspberry Pi)
for zone in ('/sys/class/thermal/thermal_zone0/temp',
'/sys/class/thermal/thermal_zone1/temp'):
try:
raw = Path(zone).read_text().strip()
info['cpu_temp'] = round(int(raw) / 1000, 1)
break
except Exception:
info['cpu_temp'] = None
# RAM
try:
mem: dict = {}
for line in Path('/proc/meminfo').read_text().splitlines():
parts = line.split()
if len(parts) >= 2:
mem[parts[0].rstrip(':')] = int(parts[1])
total = mem.get('MemTotal', 0)
avail = mem.get('MemAvailable', 0)
used = total - avail
info['ram_total'] = round(total / 1024)
info['ram_used'] = round(used / 1024)
info['ram_pct'] = round(used / total * 100) if total else 0
except Exception:
info['ram_total'] = info['ram_used'] = info['ram_pct'] = None
# SD-Karte (root-Dateisystem)
try:
du = shutil.disk_usage('/')
info['disk_total'] = round(du.total / 1e9, 1)
info['disk_used'] = round(du.used / 1e9, 1)
info['disk_pct'] = round(du.used / du.total * 100) if du.total else 0
except Exception:
info['disk_total'] = info['disk_used'] = info['disk_pct'] = None
return info
def _vtuple(v):
try:
return tuple(int(x) for x in v.strip().lstrip('v').split('.'))
except Exception:
return (0,)
def check_for_updates():
with update_lock:
if update_state['checking']:
return
update_state['checking'] = True
update_state['error'] = None
try:
req = _urlreq.urlopen(f'{RAW_BASE}/version.txt', timeout=10)
latest = req.read().decode().strip()
avail = _vtuple(latest) > _vtuple(VERSION)
with update_lock:
update_state.update(latest=latest, available=avail,
last_checked=datetime.now().isoformat())
if avail:
log.info(f'Update verfügbar: {VERSION} -> {latest}')
except Exception as e:
with update_lock:
update_state['error'] = str(e)
log.warning(f'Update-Check fehlgeschlagen: {e}')
finally:
with update_lock:
update_state['checking'] = False
def update_check_loop():
time.sleep(5) # Kurz nach Start einmalig prüfen
while True:
check_for_updates()
time.sleep(6 * 3600) # Dann alle 6 Stunden
def install_update():
"""Lädt alle Moduldateien herunter, prüft Syntax und ersetzt sie atomar."""
log.info('Update wird heruntergeladen...')
# Zuerst version.txt holen und neuen Code validieren
vreq = _urlreq.urlopen(f'{RAW_BASE}/version.txt', timeout=10)
new_version = vreq.read().decode().strip()
# app.py herunterladen und Syntax prüfen
req = _urlreq.urlopen(f'{RAW_BASE}/app.py', timeout=60)
new_app_code = req.read().decode()
compile(new_app_code, 'app.py', 'exec')
# Logo
logo_req = _urlreq.urlopen(f'{RAW_BASE}/PiCopy_Logo.png', timeout=30)
logo_data = logo_req.read()
# Alle Dateien schreiben
for rel_path in UPDATE_FILES:
dest = BASE_DIR / rel_path
dest.parent.mkdir(parents=True, exist_ok=True)
url = f'{RAW_BASE}/{rel_path}'
if rel_path == 'app.py':
content_bytes = new_app_code.encode('utf-8')
elif rel_path == 'version.txt':
content_bytes = (new_version + '\n').encode('utf-8')
elif rel_path == 'PiCopy_Logo.png':
content_bytes = logo_data
else:
try:
r = _urlreq.urlopen(url, timeout=60)
content_bytes = r.read()
except Exception as e:
log.warning(f'Update: {rel_path} konnte nicht heruntergeladen werden: {e}')
continue
tmp = dest.with_suffix(dest.suffix + '.tmp')
tmp.write_bytes(content_bytes)
with open(tmp, 'rb') as fh:
os.fsync(fh.fileno())
os.replace(str(tmp), str(dest))
log.info('Update installiert - starte Dienst neu...')
subprocess.Popen(['systemctl', 'restart', 'picopy'])
def do_format(fs: str, name: str, dev: str):
"""Formatiert ein Laufwerk. Wird in einem Thread ausgeführt."""
format_state.update(running=True, error=None, done=False, fs=fs, device=dev)
try:
# Aushängen falls gemountet
subprocess.run(['umount', dev], capture_output=True)
cmd = FORMAT_FILESYSTEMS[fs]['cmd'](dev, name)
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if r.returncode != 0:
err = r.stderr.strip() or r.stdout.strip() or 'Unbekannter Fehler'
# Hilfreiche Meldung wenn Paket fehlt
pkg = FORMAT_FILESYSTEMS[fs]['pkg']
if 'not found' in err or r.returncode == 127:
err = f'Befehl nicht gefunden bitte installieren: apt install {pkg}'
format_state.update(error=err)
return
format_state.update(done=True)
log.info(f'Formatierung {fs} auf {dev} abgeschlossen')
except subprocess.TimeoutExpired:
format_state.update(error='Timeout Formatierung dauerte zu lange')
except Exception as e:
format_state.update(error=str(e))
finally:
format_state['running'] = False