"""PiCopy – Systeminfo, Format-Drives, Update-System.""" import os import shutil import subprocess import threading import time from datetime import datetime from pathlib import Path import urllib.request as _urlreq from picopy.config import BASE_DIR, RAW_BASE, VERSION, _atomic_write, log from picopy.state import copy_state update_state = { 'current': VERSION, 'latest': None, 'available': False, 'checking': False, 'error': None, 'last_checked': None, } update_lock = threading.Lock() format_state = {'running': False, 'error': None, 'done': False, 'fs': '', 'device': ''} FORMAT_FILESYSTEMS = { 'exfat': { 'label': 'exFAT', 'desc': 'Empfohlen – Mac & Windows, keine 4-GB-Dateigrößenbeschränkung', 'cmd': lambda dev, name: ['mkfs.exfat', '-n', name, dev], 'pkg': 'exfatprogs', }, 'fat32': { 'label': 'FAT32', 'desc': 'Mac & Windows, max. 4 GB pro Datei', 'cmd': lambda dev, name: ['mkfs.vfat', '-F', '32', '-n', name[:11], dev], 'pkg': 'dosfstools', }, 'ntfs': { 'label': 'NTFS', 'desc': 'Windows nativ, Mac nur lesen', 'cmd': lambda dev, name: ['mkfs.ntfs', '-f', '-L', name[:32], dev], 'pkg': 'ntfs-3g', }, } # Liste aller Dateien die beim Update heruntergeladen werden müssen UPDATE_FILES = [ 'app.py', 'version.txt', 'PiCopy_Logo.png', 'picopy/__init__.py', 'picopy/config.py', 'picopy/state.py', 'picopy/usb.py', 'picopy/copy_engine.py', 'picopy/wifi.py', 'picopy/wireguard.py', 'picopy/samba.py', 'picopy/upload.py', 'picopy/system.py', 'routes/__init__.py', 'routes/copy_routes.py', 'routes/wifi_routes.py', 'routes/wireguard_routes.py', 'routes/upload_routes.py', 'routes/system_routes.py', 'routes/browse_routes.py', 'templates/index.html', ] def get_sysinfo() -> dict: info: dict = {} # CPU-Temperatur (Raspberry Pi) for zone in ('/sys/class/thermal/thermal_zone0/temp', '/sys/class/thermal/thermal_zone1/temp'): try: raw = Path(zone).read_text().strip() info['cpu_temp'] = round(int(raw) / 1000, 1) break except Exception: info['cpu_temp'] = None # RAM try: mem: dict = {} for line in Path('/proc/meminfo').read_text().splitlines(): parts = line.split() if len(parts) >= 2: mem[parts[0].rstrip(':')] = int(parts[1]) total = mem.get('MemTotal', 0) avail = mem.get('MemAvailable', 0) used = total - avail info['ram_total'] = round(total / 1024) info['ram_used'] = round(used / 1024) info['ram_pct'] = round(used / total * 100) if total else 0 except Exception: info['ram_total'] = info['ram_used'] = info['ram_pct'] = None # SD-Karte (root-Dateisystem) try: du = shutil.disk_usage('/') info['disk_total'] = round(du.total / 1e9, 1) info['disk_used'] = round(du.used / 1e9, 1) info['disk_pct'] = round(du.used / du.total * 100) if du.total else 0 except Exception: info['disk_total'] = info['disk_used'] = info['disk_pct'] = None return info def _vtuple(v): try: return tuple(int(x) for x in v.strip().lstrip('v').split('.')) except Exception: return (0,) def check_for_updates(): with update_lock: if update_state['checking']: return update_state['checking'] = True update_state['error'] = None try: req = _urlreq.urlopen(f'{RAW_BASE}/version.txt', timeout=10) latest = req.read().decode().strip() avail = _vtuple(latest) > _vtuple(VERSION) with update_lock: update_state.update(latest=latest, available=avail, last_checked=datetime.now().isoformat()) if avail: log.info(f'Update verfügbar: {VERSION} -> {latest}') except Exception as e: with update_lock: update_state['error'] = str(e) log.warning(f'Update-Check fehlgeschlagen: {e}') finally: with update_lock: update_state['checking'] = False def update_check_loop(): time.sleep(5) # Kurz nach Start einmalig prüfen while True: check_for_updates() time.sleep(6 * 3600) # Dann alle 6 Stunden def install_update(): """Lädt alle Moduldateien herunter, prüft Syntax und ersetzt sie atomar.""" log.info('Update wird heruntergeladen...') # Zuerst version.txt holen und neuen Code validieren vreq = _urlreq.urlopen(f'{RAW_BASE}/version.txt', timeout=10) new_version = vreq.read().decode().strip() # app.py herunterladen und Syntax prüfen req = _urlreq.urlopen(f'{RAW_BASE}/app.py', timeout=60) new_app_code = req.read().decode() compile(new_app_code, 'app.py', 'exec') # Logo logo_req = _urlreq.urlopen(f'{RAW_BASE}/PiCopy_Logo.png', timeout=30) logo_data = logo_req.read() # Alle Dateien schreiben for rel_path in UPDATE_FILES: dest = BASE_DIR / rel_path dest.parent.mkdir(parents=True, exist_ok=True) url = f'{RAW_BASE}/{rel_path}' if rel_path == 'app.py': content_bytes = new_app_code.encode('utf-8') elif rel_path == 'version.txt': content_bytes = (new_version + '\n').encode('utf-8') elif rel_path == 'PiCopy_Logo.png': content_bytes = logo_data else: try: r = _urlreq.urlopen(url, timeout=60) content_bytes = r.read() except Exception as e: log.warning(f'Update: {rel_path} konnte nicht heruntergeladen werden: {e}') continue tmp = dest.with_suffix(dest.suffix + '.tmp') tmp.write_bytes(content_bytes) with open(tmp, 'rb') as fh: os.fsync(fh.fileno()) os.replace(str(tmp), str(dest)) log.info('Update installiert - starte Dienst neu...') subprocess.Popen(['systemctl', 'restart', 'picopy']) def do_format(fs: str, name: str, dev: str): """Formatiert ein Laufwerk. Wird in einem Thread ausgeführt.""" format_state.update(running=True, error=None, done=False, fs=fs, device=dev) try: # Aushängen falls gemountet subprocess.run(['umount', dev], capture_output=True) cmd = FORMAT_FILESYSTEMS[fs]['cmd'](dev, name) r = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if r.returncode != 0: err = r.stderr.strip() or r.stdout.strip() or 'Unbekannter Fehler' # Hilfreiche Meldung wenn Paket fehlt pkg = FORMAT_FILESYSTEMS[fs]['pkg'] if 'not found' in err or r.returncode == 127: err = f'Befehl nicht gefunden – bitte installieren: apt install {pkg}' format_state.update(error=err) return format_state.update(done=True) log.info(f'Formatierung {fs} auf {dev} abgeschlossen') except subprocess.TimeoutExpired: format_state.update(error='Timeout – Formatierung dauerte zu lange') except Exception as e: format_state.update(error=str(e)) finally: format_state['running'] = False