"""PiCopy – USB-Erkennung: usb_devices, usb_port_of, ensure_mount, cleanup_stale_mounts.""" import os import re import json import subprocess from pathlib import Path from picopy.config import INTERNAL_DEST_DIR, log def usb_port_of(dev_name): """Gibt den physischen USB-Port-Pfad zurück (z.B. '2-2'). Primär via udevadm, Fallback via sysfs.""" # Primär: udevadm (zuverlässiger) try: r = subprocess.run( ['udevadm', 'info', '-q', 'path', '-n', f'/dev/{dev_name}'], capture_output=True, text=True, timeout=5 ) if r.returncode == 0: port = None for seg in r.stdout.strip().split('/'): if re.fullmatch(r'\d+-[\d.]+', seg): port = seg if port: return port except Exception: pass # Fallback: sysfs readlink try: real = Path(f'/sys/block/{dev_name}').resolve() port = None for seg in str(real).split('/'): if re.fullmatch(r'\d+[\-\d.]+', seg) and ':' not in seg: port = seg return port except Exception: return None def usb_devices(): try: out = subprocess.check_output( ['lsblk', '-J', '-o', 'NAME,TRAN,MOUNTPOINT,LABEL,SIZE,MODEL'], timeout=10, text=True ) data = json.loads(out) except Exception as e: log.error(f'lsblk: {e}') return [] result = [] for bd in data.get('blockdevices', []): if bd.get('tran') != 'usb': continue name = bd['name'] port = usb_port_of(name) model = (bd.get('label') or bd.get('model') or name).strip() for child in (bd.get('children') or []): result.append({ 'device': f'/dev/{child["name"]}', 'usb_port': port, 'mount': child.get('mountpoint') or '', 'label': (child.get('label') or model).strip(), 'size': child.get('size') or bd.get('size') or '', }) if not bd.get('children'): result.append({ 'device': f'/dev/{name}', 'usb_port': port, 'mount': bd.get('mountpoint') or '', 'label': model, 'size': bd.get('size') or '', }) return result def ensure_mount(dev_info): if dev_info.get('internal'): INTERNAL_DEST_DIR.mkdir(parents=True, exist_ok=True) return str(INTERNAL_DEST_DIR), False mp = dev_info.get('mount') if mp: return mp, False dev = dev_info['device'] mp = f'/mnt/picopy{dev.replace("/","_")}' os.makedirs(mp, exist_ok=True) r = subprocess.run(['mount', dev, mp], capture_output=True) if r.returncode: log.error(f'mount failed: {r.stderr.decode()}') return None, False return mp, True def cleanup_stale_mounts() -> None: """Bereinigt beim Start hängen gebliebene PiCopy-Mounts (z.B. nach Stromausfall).""" try: with open('/proc/mounts') as fh: mps = [line.split()[1] for line in fh if '/mnt/picopy' in line] for mp in mps: log.info(f'Bereinige veralteten Mount: {mp}') subprocess.run(['umount', '-l', mp], capture_output=True) except Exception as e: log.warning(f'Stale-Mount-Bereinigung fehlgeschlagen: {e}') def internal_dest_device(cfg=None): from picopy.config import load_cfg, _fmt_bytes cfg = cfg or load_cfg() usage = _internal_usage() return { 'device': 'internal', 'usb_port': '__internal__', 'mount': str(INTERNAL_DEST_DIR), 'label': cfg.get('internal_dest_label') or 'Interner Speicher', 'size': _fmt_bytes(usage['free']) + ' frei', 'internal': True, } def _internal_usage(): import shutil INTERNAL_DEST_DIR.mkdir(parents=True, exist_ok=True) usage = shutil.disk_usage(INTERNAL_DEST_DIR) return { 'path': str(INTERNAL_DEST_DIR), 'total': usage.total, 'used': usage.used, 'free': usage.free, }