132 lines
4.0 KiB
Python
132 lines
4.0 KiB
Python
"""PiCopy – USB-Erkennung: usb_devices, usb_port_of, ensure_mount, cleanup_stale_mounts."""
|
||
|
||
import os
|
||
import re
|
||
import json
|
||
import subprocess
|
||
from pathlib import Path
|
||
|
||
from picopy.config import INTERNAL_DEST_DIR, log
|
||
|
||
|
||
def usb_port_of(dev_name):
|
||
"""Gibt den physischen USB-Port-Pfad zurück (z.B. '2-2').
|
||
Primär via udevadm, Fallback via sysfs."""
|
||
# Primär: udevadm (zuverlässiger)
|
||
try:
|
||
r = subprocess.run(
|
||
['udevadm', 'info', '-q', 'path', '-n', f'/dev/{dev_name}'],
|
||
capture_output=True, text=True, timeout=5
|
||
)
|
||
if r.returncode == 0:
|
||
port = None
|
||
for seg in r.stdout.strip().split('/'):
|
||
if re.fullmatch(r'\d+-[\d.]+', seg):
|
||
port = seg
|
||
if port:
|
||
return port
|
||
except Exception:
|
||
pass
|
||
# Fallback: sysfs readlink
|
||
try:
|
||
real = Path(f'/sys/block/{dev_name}').resolve()
|
||
port = None
|
||
for seg in str(real).split('/'):
|
||
if re.fullmatch(r'\d+[\-\d.]+', seg) and ':' not in seg:
|
||
port = seg
|
||
return port
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def usb_devices():
|
||
try:
|
||
out = subprocess.check_output(
|
||
['lsblk', '-J', '-o', 'NAME,TRAN,MOUNTPOINT,LABEL,SIZE,MODEL'],
|
||
timeout=10, text=True
|
||
)
|
||
data = json.loads(out)
|
||
except Exception as e:
|
||
log.error(f'lsblk: {e}')
|
||
return []
|
||
|
||
result = []
|
||
for bd in data.get('blockdevices', []):
|
||
if bd.get('tran') != 'usb':
|
||
continue
|
||
name = bd['name']
|
||
port = usb_port_of(name)
|
||
model = (bd.get('label') or bd.get('model') or name).strip()
|
||
for child in (bd.get('children') or []):
|
||
result.append({
|
||
'device': f'/dev/{child["name"]}',
|
||
'usb_port': port,
|
||
'mount': child.get('mountpoint') or '',
|
||
'label': (child.get('label') or model).strip(),
|
||
'size': child.get('size') or bd.get('size') or '',
|
||
})
|
||
if not bd.get('children'):
|
||
result.append({
|
||
'device': f'/dev/{name}',
|
||
'usb_port': port,
|
||
'mount': bd.get('mountpoint') or '',
|
||
'label': model,
|
||
'size': bd.get('size') or '',
|
||
})
|
||
return result
|
||
|
||
|
||
def ensure_mount(dev_info):
|
||
if dev_info.get('internal'):
|
||
INTERNAL_DEST_DIR.mkdir(parents=True, exist_ok=True)
|
||
return str(INTERNAL_DEST_DIR), False
|
||
mp = dev_info.get('mount')
|
||
if mp:
|
||
return mp, False
|
||
dev = dev_info['device']
|
||
mp = f'/mnt/picopy{dev.replace("/","_")}'
|
||
os.makedirs(mp, exist_ok=True)
|
||
r = subprocess.run(['mount', dev, mp], capture_output=True)
|
||
if r.returncode:
|
||
log.error(f'mount failed: {r.stderr.decode()}')
|
||
return None, False
|
||
return mp, True
|
||
|
||
|
||
def cleanup_stale_mounts() -> None:
|
||
"""Bereinigt beim Start hängen gebliebene PiCopy-Mounts (z.B. nach Stromausfall)."""
|
||
try:
|
||
with open('/proc/mounts') as fh:
|
||
mps = [line.split()[1] for line in fh if '/mnt/picopy' in line]
|
||
for mp in mps:
|
||
log.info(f'Bereinige veralteten Mount: {mp}')
|
||
subprocess.run(['umount', '-l', mp], capture_output=True)
|
||
except Exception as e:
|
||
log.warning(f'Stale-Mount-Bereinigung fehlgeschlagen: {e}')
|
||
|
||
|
||
def internal_dest_device(cfg=None):
|
||
from picopy.config import load_cfg, _fmt_bytes
|
||
cfg = cfg or load_cfg()
|
||
usage = _internal_usage()
|
||
return {
|
||
'device': 'internal',
|
||
'usb_port': '__internal__',
|
||
'mount': str(INTERNAL_DEST_DIR),
|
||
'label': cfg.get('internal_dest_label') or 'Interner Speicher',
|
||
'size': _fmt_bytes(usage['free']) + ' frei',
|
||
'internal': True,
|
||
}
|
||
|
||
|
||
def _internal_usage():
|
||
import shutil
|
||
INTERNAL_DEST_DIR.mkdir(parents=True, exist_ok=True)
|
||
usage = shutil.disk_usage(INTERNAL_DEST_DIR)
|
||
return {
|
||
'path': str(INTERNAL_DEST_DIR),
|
||
'total': usage.total,
|
||
'used': usage.used,
|
||
'free': usage.free,
|
||
}
|