223 lines
7.2 KiB
Python
223 lines
7.2 KiB
Python
"""PiCopy – Systeminfo, Format-Drives, Update-System."""
|
||
|
||
import os
|
||
import shutil
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import urllib.request as _urlreq
|
||
|
||
from picopy.config import BASE_DIR, RAW_BASE, VERSION, _atomic_write, log
|
||
from picopy.state import copy_state
|
||
|
||
update_state = {
|
||
'current': VERSION,
|
||
'latest': None,
|
||
'available': False,
|
||
'checking': False,
|
||
'error': None,
|
||
'last_checked': None,
|
||
}
|
||
update_lock = threading.Lock()
|
||
|
||
format_state = {'running': False, 'error': None, 'done': False, 'fs': '', 'device': ''}
|
||
|
||
FORMAT_FILESYSTEMS = {
|
||
'exfat': {
|
||
'label': 'exFAT',
|
||
'desc': 'Empfohlen – Mac & Windows, keine 4-GB-Dateigrößenbeschränkung',
|
||
'cmd': lambda dev, name: ['mkfs.exfat', '-n', name, dev],
|
||
'pkg': 'exfatprogs',
|
||
},
|
||
'fat32': {
|
||
'label': 'FAT32',
|
||
'desc': 'Mac & Windows, max. 4 GB pro Datei',
|
||
'cmd': lambda dev, name: ['mkfs.vfat', '-F', '32', '-n', name[:11], dev],
|
||
'pkg': 'dosfstools',
|
||
},
|
||
'ntfs': {
|
||
'label': 'NTFS',
|
||
'desc': 'Windows nativ, Mac nur lesen',
|
||
'cmd': lambda dev, name: ['mkfs.ntfs', '-f', '-L', name[:32], dev],
|
||
'pkg': 'ntfs-3g',
|
||
},
|
||
}
|
||
|
||
# Liste aller Dateien die beim Update heruntergeladen werden müssen
|
||
UPDATE_FILES = [
|
||
'app.py',
|
||
'version.txt',
|
||
'PiCopy_Logo.png',
|
||
'picopy/__init__.py',
|
||
'picopy/config.py',
|
||
'picopy/state.py',
|
||
'picopy/usb.py',
|
||
'picopy/copy_engine.py',
|
||
'picopy/wifi.py',
|
||
'picopy/wireguard.py',
|
||
'picopy/samba.py',
|
||
'picopy/upload.py',
|
||
'picopy/system.py',
|
||
'routes/__init__.py',
|
||
'routes/copy_routes.py',
|
||
'routes/wifi_routes.py',
|
||
'routes/wireguard_routes.py',
|
||
'routes/upload_routes.py',
|
||
'routes/system_routes.py',
|
||
'routes/browse_routes.py',
|
||
'templates/index.html',
|
||
]
|
||
|
||
|
||
def get_sysinfo() -> dict:
|
||
info: dict = {}
|
||
# CPU-Temperatur (Raspberry Pi)
|
||
for zone in ('/sys/class/thermal/thermal_zone0/temp',
|
||
'/sys/class/thermal/thermal_zone1/temp'):
|
||
try:
|
||
raw = Path(zone).read_text().strip()
|
||
info['cpu_temp'] = round(int(raw) / 1000, 1)
|
||
break
|
||
except Exception:
|
||
info['cpu_temp'] = None
|
||
# RAM
|
||
try:
|
||
mem: dict = {}
|
||
for line in Path('/proc/meminfo').read_text().splitlines():
|
||
parts = line.split()
|
||
if len(parts) >= 2:
|
||
mem[parts[0].rstrip(':')] = int(parts[1])
|
||
total = mem.get('MemTotal', 0)
|
||
avail = mem.get('MemAvailable', 0)
|
||
used = total - avail
|
||
info['ram_total'] = round(total / 1024)
|
||
info['ram_used'] = round(used / 1024)
|
||
info['ram_pct'] = round(used / total * 100) if total else 0
|
||
except Exception:
|
||
info['ram_total'] = info['ram_used'] = info['ram_pct'] = None
|
||
# SD-Karte (root-Dateisystem)
|
||
try:
|
||
du = shutil.disk_usage('/')
|
||
info['disk_total'] = round(du.total / 1e9, 1)
|
||
info['disk_used'] = round(du.used / 1e9, 1)
|
||
info['disk_pct'] = round(du.used / du.total * 100) if du.total else 0
|
||
except Exception:
|
||
info['disk_total'] = info['disk_used'] = info['disk_pct'] = None
|
||
return info
|
||
|
||
|
||
def _vtuple(v):
|
||
try:
|
||
return tuple(int(x) for x in v.strip().lstrip('v').split('.'))
|
||
except Exception:
|
||
return (0,)
|
||
|
||
|
||
def check_for_updates():
|
||
with update_lock:
|
||
if update_state['checking']:
|
||
return
|
||
update_state['checking'] = True
|
||
update_state['error'] = None
|
||
|
||
try:
|
||
req = _urlreq.urlopen(f'{RAW_BASE}/version.txt', timeout=10)
|
||
latest = req.read().decode().strip()
|
||
avail = _vtuple(latest) > _vtuple(VERSION)
|
||
with update_lock:
|
||
update_state.update(latest=latest, available=avail,
|
||
last_checked=datetime.now().isoformat())
|
||
if avail:
|
||
log.info(f'Update verfügbar: {VERSION} -> {latest}')
|
||
except Exception as e:
|
||
with update_lock:
|
||
update_state['error'] = str(e)
|
||
log.warning(f'Update-Check fehlgeschlagen: {e}')
|
||
finally:
|
||
with update_lock:
|
||
update_state['checking'] = False
|
||
|
||
|
||
def update_check_loop():
|
||
time.sleep(5) # Kurz nach Start einmalig prüfen
|
||
while True:
|
||
check_for_updates()
|
||
time.sleep(6 * 3600) # Dann alle 6 Stunden
|
||
|
||
|
||
def install_update():
|
||
"""Lädt alle Moduldateien herunter, prüft Syntax und ersetzt sie atomar."""
|
||
log.info('Update wird heruntergeladen...')
|
||
|
||
# Zuerst version.txt holen und neuen Code validieren
|
||
vreq = _urlreq.urlopen(f'{RAW_BASE}/version.txt', timeout=10)
|
||
new_version = vreq.read().decode().strip()
|
||
|
||
# app.py herunterladen und Syntax prüfen
|
||
req = _urlreq.urlopen(f'{RAW_BASE}/app.py', timeout=60)
|
||
new_app_code = req.read().decode()
|
||
compile(new_app_code, 'app.py', 'exec')
|
||
|
||
# Logo
|
||
logo_req = _urlreq.urlopen(f'{RAW_BASE}/PiCopy_Logo.png', timeout=30)
|
||
logo_data = logo_req.read()
|
||
|
||
# Alle Dateien schreiben
|
||
for rel_path in UPDATE_FILES:
|
||
dest = BASE_DIR / rel_path
|
||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
url = f'{RAW_BASE}/{rel_path}'
|
||
if rel_path == 'app.py':
|
||
content_bytes = new_app_code.encode('utf-8')
|
||
elif rel_path == 'version.txt':
|
||
content_bytes = (new_version + '\n').encode('utf-8')
|
||
elif rel_path == 'PiCopy_Logo.png':
|
||
content_bytes = logo_data
|
||
else:
|
||
try:
|
||
r = _urlreq.urlopen(url, timeout=60)
|
||
content_bytes = r.read()
|
||
except Exception as e:
|
||
log.warning(f'Update: {rel_path} konnte nicht heruntergeladen werden: {e}')
|
||
continue
|
||
|
||
tmp = dest.with_suffix(dest.suffix + '.tmp')
|
||
tmp.write_bytes(content_bytes)
|
||
with open(tmp, 'rb') as fh:
|
||
os.fsync(fh.fileno())
|
||
os.replace(str(tmp), str(dest))
|
||
|
||
log.info('Update installiert - starte Dienst neu...')
|
||
subprocess.Popen(['systemctl', 'restart', 'picopy'])
|
||
|
||
|
||
def do_format(fs: str, name: str, dev: str):
|
||
"""Formatiert ein Laufwerk. Wird in einem Thread ausgeführt."""
|
||
format_state.update(running=True, error=None, done=False, fs=fs, device=dev)
|
||
try:
|
||
# Aushängen falls gemountet
|
||
subprocess.run(['umount', dev], capture_output=True)
|
||
|
||
cmd = FORMAT_FILESYSTEMS[fs]['cmd'](dev, name)
|
||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
|
||
if r.returncode != 0:
|
||
err = r.stderr.strip() or r.stdout.strip() or 'Unbekannter Fehler'
|
||
# Hilfreiche Meldung wenn Paket fehlt
|
||
pkg = FORMAT_FILESYSTEMS[fs]['pkg']
|
||
if 'not found' in err or r.returncode == 127:
|
||
err = f'Befehl nicht gefunden – bitte installieren: apt install {pkg}'
|
||
format_state.update(error=err)
|
||
return
|
||
format_state.update(done=True)
|
||
log.info(f'Formatierung {fs} auf {dev} abgeschlossen')
|
||
except subprocess.TimeoutExpired:
|
||
format_state.update(error='Timeout – Formatierung dauerte zu lange')
|
||
except Exception as e:
|
||
format_state.update(error=str(e))
|
||
finally:
|
||
format_state['running'] = False
|