"""PiCopy – Blueprint: /api/browse, /api/history*, /api/internal-share*.""" import os import subprocess from datetime import datetime from pathlib import Path from flask import Blueprint, jsonify, request from picopy.config import load_cfg, HISTORY_FILE, INTERNAL_DEST_DIR, log from picopy.state import load_history from picopy.usb import usb_devices, internal_dest_device from picopy.samba import internal_share_update_state, set_internal_share_enabled browse_bp = Blueprint('browse', __name__) _browse_mounts = {} # usb_port -> mount_point def _mp_is_alive(mp): """Prüft ob ein Mount-Punkt wirklich aktiv und lesbar ist.""" try: with open('/proc/mounts') as f: mounted = any(mp in line.split() for line in f) if not mounted: return False os.listdir(mp) # I/O-Test: schlägt fehl wenn Gerät entfernt wurde return True except Exception: return False def _drop_browse_mount(port): """Veralteten Mount bereinigen.""" mp = _browse_mounts.pop(port, None) if mp: subprocess.run(['umount', '-l', mp], capture_output=True) log.info(f'Browse-Mount bereinigt: {mp}') def get_browse_mp(dev): if dev.get('internal'): INTERNAL_DEST_DIR.mkdir(parents=True, exist_ok=True) return str(INTERNAL_DEST_DIR) port = dev.get('usb_port', '') # Auto-mount vom System bevorzugen if dev.get('mount') and _mp_is_alive(dev['mount']): return dev['mount'] # Gecachten Mount prüfen mp = _browse_mounts.get(port) if mp: if _mp_is_alive(mp): return mp _drop_browse_mount(port) # veraltet -> aufräumen # Frisch mounten mp = f'/mnt/picopy_br_{port}' os.makedirs(mp, exist_ok=True) r = subprocess.run(['mount', dev['device'], mp], capture_output=True) if r.returncode == 0: _browse_mounts[port] = mp return mp return None @browse_bp.route('/api/browse') def r_browse(): port = request.args.get('port', '') rpath = request.args.get('path', '').lstrip('/') devs = usb_devices() dev = internal_dest_device(load_cfg()) if port == '__internal__' else None if dev is None: dev = next((d for d in devs if d['usb_port'] == port), None) if not dev: return jsonify(error='Gerät nicht verbunden - bitte neu einstecken'), 404 mp = get_browse_mp(dev) if not mp: return jsonify(error='Gerät nicht lesbar - bitte neu einstecken'), 500 try: base = Path(mp).resolve() target = (base / rpath).resolve() if not str(target).startswith(str(base)): return jsonify(error='Ungültiger Pfad'), 400 if not target.is_dir(): return jsonify(error='Kein Verzeichnis'), 400 entries = [] for item in sorted(target.iterdir(), key=lambda x: (x.is_file(), x.name.lower())): try: s = item.stat() entries.append({ 'name': item.name, 'dir': item.is_dir(), 'size': s.st_size if item.is_file() else None, 'mtime': datetime.fromtimestamp(s.st_mtime).strftime('%d.%m.%y %H:%M'), }) except OSError: pass rel = str(target.relative_to(base)) return jsonify(path='' if rel == '.' else rel, entries=entries) except OSError as e: import errno as _errno if e.errno == _errno.EIO: # I/O-Fehler = Gerät abgezogen, Mount bereinigen _drop_browse_mount(port) return jsonify(error='Gerät nicht mehr erreichbar - bitte neu einstecken'), 503 return jsonify(error=str(e)), 500 except Exception as e: return jsonify(error=str(e)), 500 @browse_bp.route('/api/history') def r_history(): return jsonify(load_history()) @browse_bp.route('/api/history', methods=['DELETE']) def r_history_clear(): try: HISTORY_FILE.write_text('[]', encoding='utf-8') except Exception: pass return jsonify(ok=True) @browse_bp.route('/api/internal-share/status') def r_internal_share_status(): return jsonify(internal_share_update_state()) @browse_bp.route('/api/internal-share', methods=['POST']) def r_internal_share_set(): data = request.get_json(force=True) or {} enabled = bool(data.get('enabled')) ok, err = set_internal_share_enabled(enabled) if not ok: return jsonify(error=err), 500 return jsonify(ok=True, status=internal_share_update_state())