#!/usr/bin/env python3 """PiCopy - Automatischer USB-Kopierdienst mit Web-Interface""" import os import re import json import shutil import logging import threading import subprocess from datetime import datetime from pathlib import Path from flask import Flask, jsonify, request app = Flask(__name__) BASE_DIR = Path('/opt/picopy') CONFIG_FILE = BASE_DIR / 'config.json' LOG_DIR = BASE_DIR / 'logs' LOG_FILE = LOG_DIR / 'picopy.log' LOG_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()] ) log = logging.getLogger('picopy') DEFAULT_CONFIG = { 'source_port': None, 'dest_port': None, 'folder_format': '%Y-%m-%d', 'add_time': True, 'subfolder': True, 'auto_copy': True, } state = { 'running': False, 'progress': 0, 'total': 0, 'done': 0, 'current': '', 'error': None, 'last_copy': None, 'logs': [], } lock = threading.Lock() def load_cfg(): cfg = DEFAULT_CONFIG.copy() try: if CONFIG_FILE.exists(): cfg.update(json.loads(CONFIG_FILE.read_text())) except Exception: pass return cfg def save_cfg(cfg): CONFIG_FILE.write_text(json.dumps(cfg, indent=2)) def usb_port_of(dev_name): try: real = Path(f'/sys/block/{dev_name}').resolve() port = None for seg in str(real).split('/'): if re.fullmatch(r'\d+[\-\d.]+', seg) and ':' not in seg: port = seg return port except Exception: return None def usb_devices(): try: out = subprocess.check_output( ['lsblk', '-J', '-o', 'NAME,TRAN,MOUNTPOINT,LABEL,SIZE,MODEL'], timeout=10, text=True ) data = json.loads(out) except Exception as e: log.error(f'lsblk: {e}') return [] result = [] for bd in data.get('blockdevices', []): if bd.get('tran') != 'usb': continue name = bd['name'] port = usb_port_of(name) model = (bd.get('label') or bd.get('model') or name).strip() children = bd.get('children') or [] if children: for child in children: result.append({ 'device': f'/dev/{child["name"]}', 'usb_port': port, 'mount': child.get('mountpoint') or '', 'label': (child.get('label') or model).strip(), 'size': child.get('size') or bd.get('size') or '', }) else: result.append({ 'device': f'/dev/{name}', 'usb_port': port, 'mount': bd.get('mountpoint') or '', 'label': model, 'size': bd.get('size') or '', }) return result def ensure_mount(dev_info): mp = dev_info.get('mount') if mp: return mp, False dev = dev_info['device'] mp = f'/mnt/picopy{dev.replace("/", "_")}' os.makedirs(mp, exist_ok=True) r = subprocess.run(['mount', dev, mp], capture_output=True) if r.returncode: log.error(f'mount failed: {r.stderr.decode()}') return None, False return mp, True def add_log(msg): log.info(msg) with lock: state['logs'].append({'t': datetime.now().strftime('%H:%M:%S'), 'm': msg}) state['logs'] = state['logs'][-100:] def do_copy(src_dev, dst_dev, cfg): src_mp = dst_mp = None src_owned = dst_owned = False try: with lock: state.update(running=True, progress=0, error=None, done=0, total=0, logs=[], current='') add_log('Kopiervorgang gestartet') src_mp, src_owned = ensure_mount(src_dev) if not src_mp: raise RuntimeError(f'Quelle nicht mountbar: {src_dev["device"]}') add_log(f'Quelle: {src_mp} ({src_dev["label"]})') dst_mp, dst_owned = ensure_mount(dst_dev) if not dst_mp: raise RuntimeError(f'Ziel nicht mountbar: {dst_dev["device"]}') add_log(f'Ziel: {dst_mp} ({dst_dev["label"]})') ts = datetime.now() date_str = ts.strftime(cfg['folder_format']) if cfg.get('add_time'): date_str += '_' + ts.strftime('%H%M%S') label = re.sub(r'[^\w\-]', '_', src_dev.get('label', 'source')) dst_dir = Path(dst_mp) / date_str if cfg.get('subfolder'): dst_dir = dst_dir / label dst_dir.mkdir(parents=True, exist_ok=True) add_log(f'Zielordner: {dst_dir}') src_path = Path(src_mp) files = [f for f in src_path.rglob('*') if f.is_file()] total = len(files) with lock: state['total'] = total add_log(f'{total} Dateien gefunden') for i, f in enumerate(files): with lock: if not state['running']: add_log('Abgebrochen') return dst_f = dst_dir / f.relative_to(src_path) dst_f.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(f, dst_f) with lock: state.update( done=i + 1, progress=int((i + 1) / total * 100) if total else 100, current=str(f.name) ) with lock: state['last_copy'] = datetime.now().isoformat() add_log(f'Fertig! {total} Dateien kopiert') except Exception as e: log.exception('Copy failed') with lock: state['error'] = str(e) add_log(f'Fehler: {e}') finally: if src_owned and src_mp: subprocess.run(['umount', src_mp], capture_output=True) if dst_owned and dst_mp: subprocess.run(['umount', dst_mp], capture_output=True) with lock: state['running'] = False state['current'] = '' def check_auto_copy(): cfg = load_cfg() if not cfg.get('auto_copy') or not cfg.get('source_port') or not cfg.get('dest_port'): return with lock: if state['running']: return devs = usb_devices() src = next((d for d in devs if d['usb_port'] == cfg['source_port']), None) dst = next((d for d in devs if d['usb_port'] == cfg['dest_port']), None) if src and dst: log.info('Auto-Copy: beide Geräte verbunden') threading.Thread(target=do_copy, args=(src, dst, cfg), daemon=True).start() def usb_monitor(): try: import pyudev ctx = pyudev.Context() mon = pyudev.Monitor.from_netlink(ctx) mon.filter_by(subsystem='block', device_type='disk') for dev in iter(mon.poll, None): if dev.action == 'add': log.info(f'USB eingesteckt: {dev.device_node}') threading.Timer(3.0, check_auto_copy).start() except ImportError: log.warning('pyudev nicht verfügbar – USB-Überwachung deaktiviert') except Exception as e: log.error(f'USB-Monitor Fehler: {e}') # ── HTML ─────────────────────────────────────────────────────────────────── HTML = r"""