#!/usr/bin/env python3 """PiCopy v2 - USB Copy Service mit WiFi-Fallback AP""" import os import re import json import shutil import logging import threading import subprocess import time import uuid as _uuid_mod import urllib.request as _urlreq import urllib.error as _urlerr from datetime import datetime from pathlib import Path from flask import Flask, jsonify, request app = Flask(__name__) RAW_BASE = 'https://git.leuschner.dev/Tobias/PiCopy/raw/branch/main' VERSION_FILE = Path(__file__).with_name('version.txt') def load_installed_version(): try: return VERSION_FILE.read_text(encoding='utf-8').strip() or '1.0.4' except Exception: return 'X.X.X' VERSION = load_installed_version() BASE_DIR = Path('/opt/picopy') CONFIG_FILE = BASE_DIR / 'config.json' STATE_FILE = BASE_DIR / 'state.json' LOG_DIR = BASE_DIR / 'logs' LOG_FILE = LOG_DIR / 'picopy.log' LOG_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()] ) log = logging.getLogger('picopy') NM_AP_CON = 'PiCopy-AP' NM_CLIENT_CON = 'PiCopy-WiFi' WIFI_BOOT_WAIT = 25 # Sekunden warten beim Start bevor AP gestartet wird DEFAULT_CONFIG = { # USB 'source_ports': [], # [{port, label}, ...] 'source_port': None, 'source_label': '', # Migration legacy 'dest_port': None, 'dest_label': '', 'folder_format': '%Y-%m-%d', 'add_time': True, 'subfolder': True, 'auto_copy': True, 'file_filter': '', 'exclude_system': True, 'duplicate_handling': 'skip', 'verify_checksum': False, 'delete_source': False, # WiFi 'wifi_ssid': '', 'wifi_password': '', 'ap_ssid': 'PiCopy', 'ap_password': 'PiCopy,', # WireGuard 'wireguard_auto': False, } # -- Persistenter Kopierstatus ----------------------------------------------- copy_state = { 'running': False, 'progress': 0, 'total': 0, 'done': 0, 'current': '', 'error': None, 'last_copy': None, 'logs': [], 'bytes_total': 0, 'bytes_done': 0, 'start_ts': None, 'eta_sec': None, 'speed_bps': 0, 'phase': 'idle', } copy_lock = threading.Lock() _copy_thread: threading.Thread | None = None def load_state(): global copy_state try: if STATE_FILE.exists(): saved = json.loads(STATE_FILE.read_text(encoding='utf-8')) saved['running'] = False saved['current'] = '' copy_state.update(saved) except (json.JSONDecodeError, ValueError) as e: log.warning(f'state.json korrupt ({e}), starte mit leerem Zustand') try: STATE_FILE.rename(STATE_FILE.with_suffix('.corrupt')) except Exception: pass except Exception as e: log.warning(f'state.json nicht lesbar: {e}') def save_state(): try: with copy_lock: data = dict(copy_state) _atomic_write(STATE_FILE, json.dumps(data)) except Exception: pass # -- WiFi Status ------------------------------------------------------------- wifi_state = { 'mode': 'unknown', # 'client' | 'ap' | 'disconnected' 'ssid': '', 'ip': '', } wifi_lock = threading.Lock() # -- Config ------------------------------------------------------------------- def load_cfg(): cfg = DEFAULT_CONFIG.copy() try: if CONFIG_FILE.exists(): cfg.update(json.loads(CONFIG_FILE.read_text(encoding='utf-8'))) except (json.JSONDecodeError, ValueError) as e: log.error(f'config.json korrupt ({e}), verwende Standardwerte') try: CONFIG_FILE.rename(CONFIG_FILE.with_suffix('.corrupt')) except Exception: pass except Exception as e: log.warning(f'config.json nicht lesbar: {e}') return cfg def save_cfg(cfg): _atomic_write(CONFIG_FILE, json.dumps(cfg, indent=2)) # -- WiFi Hilfsfunktionen ----------------------------------------------------- def nm(*args): return subprocess.run(['nmcli'] + list(args), capture_output=True, text=True, timeout=20) def get_wlan0_info(): r = nm('-t', '-f', 'DEVICE,STATE,CONNECTION', 'dev') for line in r.stdout.splitlines(): parts = line.split(':') if parts and parts[0] == 'wlan0': return { 'state': parts[1] if len(parts) > 1 else '', 'connection': ':'.join(parts[2:]) if len(parts) > 2 else '', } return {'state': '', 'connection': ''} def get_wifi_ip(): r = nm('-t', '-f', 'IP4.ADDRESS', 'dev', 'show', 'wlan0') for line in r.stdout.splitlines(): if 'IP4.ADDRESS' in line: ip = line.split(':')[-1].split('/')[0].strip() if ip: return ip return '' def is_client_connected(): info = get_wlan0_info() return (info['state'] == 'connected' and info['connection'] and NM_AP_CON not in info['connection']) def is_ap_active(): r = nm('-t', '-f', 'NAME,STATE', 'con', 'show', '--active') return any(NM_AP_CON in l and 'activated' in l for l in r.stdout.splitlines()) def start_ap(ssid, password): log.info(f'Starte AP: {ssid}') nm('con', 'delete', NM_AP_CON) time.sleep(1) r = nm('dev', 'wifi', 'hotspot', 'ifname', 'wlan0', 'ssid', ssid, 'password', password, 'con-name', NM_AP_CON) ok = r.returncode == 0 if ok: log.info('AP gestartet') else: log.error(f'AP Fehler: {r.stderr}') return ok def stop_ap(): log.info('Stoppe AP') nm('con', 'down', NM_AP_CON) def connect_client_wifi(ssid, password): log.info(f'Verbinde mit WiFi: {ssid}') # Bestehende PiCopy-WiFi Verbindung löschen nm('con', 'delete', NM_CLIENT_CON) time.sleep(1) r = nm('dev', 'wifi', 'connect', ssid, 'password', password, 'name', NM_CLIENT_CON, 'ifname', 'wlan0') ok = r.returncode == 0 if ok: log.info(f'Verbunden mit {ssid}') else: log.error(f'WiFi-Verbindung fehlgeschlagen: {r.stderr.strip()}') return ok def scan_wifi_networks(): nm('dev', 'wifi', 'rescan') time.sleep(2) r = nm('-t', '-f', 'SSID,SIGNAL,SECURITY', 'dev', 'wifi', 'list') seen, nets = set(), [] for line in r.stdout.splitlines(): parts = line.split(':') if len(parts) >= 2: ssid = parts[0].strip() signal = parts[1].strip() if len(parts) > 1 else '0' security = ':'.join(parts[2:]).strip() if len(parts) > 2 else '' if ssid and ssid not in seen: seen.add(ssid) nets.append({'ssid': ssid, 'signal': int(signal) if signal.isdigit() else 0, 'security': security}) return sorted(nets, key=lambda x: -x['signal']) # -- WiFi Monitor Thread ------------------------------------------------------- def update_wifi_state(): info = get_wlan0_info() if info['state'] == 'connected': if NM_AP_CON in info['connection']: with wifi_lock: wifi_state.update(mode='ap', ssid=load_cfg().get('ap_ssid', 'PiCopy'), ip='10.42.0.1') else: ip = get_wifi_ip() with wifi_lock: wifi_state.update(mode='client', ssid=info['connection'], ip=ip) else: with wifi_lock: wifi_state.update(mode='disconnected', ssid='', ip='') def wifi_monitor(): log.info(f'WiFi-Monitor: warte {WIFI_BOOT_WAIT}s auf Verbindung...') time.sleep(WIFI_BOOT_WAIT) while True: try: update_wifi_state() with wifi_lock: mode = wifi_state['mode'] if mode == 'disconnected': cfg = load_cfg() ssid = cfg.get('wifi_ssid', '') pw = cfg.get('wifi_password', '') connected = False if ssid: connected = connect_client_wifi(ssid, pw) if connected: time.sleep(5) update_wifi_state() if not connected: ap_ssid = cfg.get('ap_ssid', 'PiCopy') ap_pw = cfg.get('ap_password', 'PiCopy,') if start_ap(ap_ssid, ap_pw): time.sleep(3) with wifi_lock: wifi_state.update(mode='ap', ssid=ap_ssid, ip='10.42.0.1') except Exception as e: log.error(f'WiFi-Monitor Fehler: {e}') time.sleep(30) # -- WireGuard VPN ------------------------------------------------------------- WG_CONF = Path('/etc/wireguard/picopy.conf') WG_IFACE = 'picopy' def wg_is_installed(): return shutil.which('wg-quick') is not None wg_state = { 'connected': False, 'ip': '', 'peer': '', 'error': None, 'has_config': False, 'installed': False, 'pkg_running': False, 'pkg_action': '', 'pkg_error': None, } wg_lock = threading.Lock() def wg_update_state(): inst = wg_is_installed() has_conf = WG_CONF.exists() if not inst: with wg_lock: wg_state.update(installed=False, connected=False, ip='', peer='', has_config=has_conf) return r = subprocess.run(['wg', 'show', WG_IFACE], capture_output=True, text=True, timeout=5) if r.returncode != 0: with wg_lock: wg_state.update(installed=True, connected=False, ip='', peer='', has_config=has_conf) return ip_r = subprocess.run(['ip', '-4', 'addr', 'show', WG_IFACE], capture_output=True, text=True, timeout=5) ip = '' for line in ip_r.stdout.splitlines(): if line.strip().startswith('inet '): ip = line.strip().split()[1].split('/')[0] break peer = '' for line in r.stdout.splitlines(): if line.startswith('peer:'): peer = line.split(':', 1)[-1].strip() break with wg_lock: wg_state.update(installed=True, connected=True, ip=ip, peer=peer, error=None, has_config=has_conf) def wg_connect(): if not WG_CONF.exists(): with wg_lock: wg_state['error'] = 'Keine Konfiguration vorhanden' return False r = subprocess.run(['wg-quick', 'up', WG_IFACE], capture_output=True, text=True, timeout=30) if r.returncode == 0: time.sleep(1) wg_update_state() log.info('WireGuard verbunden') return True lines = r.stderr.strip().splitlines() if r.stderr.strip() else [] real_errors = [l for l in lines if not l.strip().startswith('[#]')] err = (real_errors[-1] if real_errors else lines[-1] if lines else 'Unbekannter Fehler') if 'resolvconf' in err and 'not found' in err: err = 'resolvconf fehlt - bitte WireGuard deinstallieren und neu installieren (openresolv wird dann mitinstalliert)' with wg_lock: wg_state.update(connected=False, error=err) log.error(f'WireGuard Fehler: {err}') return False def wg_disconnect(): r = subprocess.run(['wg-quick', 'down', WG_IFACE], capture_output=True, text=True, timeout=15) with wg_lock: wg_state.update(connected=False, ip='', peer='', error=None) log.info('WireGuard getrennt') return r.returncode == 0 def _wg_apt(action: str, packages: list): """Führt apt-get install/remove aus und aktualisiert pkg_state.""" with wg_lock: if wg_state['pkg_running']: return wg_state.update(pkg_running=True, pkg_action=action, pkg_error=None) try: cmd = ['apt-get', action, '-y'] + packages r = subprocess.run(cmd, capture_output=True, text=True, timeout=300, env={**os.environ, 'DEBIAN_FRONTEND': 'noninteractive'}) if r.returncode != 0: err = (r.stderr.strip().splitlines()[-1] if r.stderr.strip() else f'apt-get {action} fehlgeschlagen') log.error(f'WireGuard apt {action}: {err}') with wg_lock: wg_state['pkg_error'] = err else: log.info(f'WireGuard apt {action} abgeschlossen') except Exception as e: with wg_lock: wg_state['pkg_error'] = str(e) finally: with wg_lock: wg_state['pkg_running'] = False wg_state['pkg_action'] = '' wg_update_state() def wg_install(): _wg_apt('install', ['wireguard', 'wireguard-tools', 'openresolv']) def wg_uninstall(): wg_disconnect() _wg_apt('remove', ['wireguard', 'wireguard-tools']) def wg_save_config(content: str): try: WG_CONF.parent.mkdir(parents=True, exist_ok=True) WG_CONF.write_text(content, encoding='utf-8') WG_CONF.chmod(0o600) return True, '' except Exception as e: return False, str(e) def wg_monitor(): while True: try: wg_update_state() except Exception: pass time.sleep(10) # -- USB Geräteerkennung ------------------------------------------------------- def usb_port_of(dev_name): """Gibt den physischen USB-Port-Pfad zurück (z.B. '2-2'). Primär via udevadm, Fallback via sysfs.""" # Primär: udevadm (zuverlässiger) try: r = subprocess.run( ['udevadm', 'info', '-q', 'path', '-n', f'/dev/{dev_name}'], capture_output=True, text=True, timeout=5 ) if r.returncode == 0: port = None for seg in r.stdout.strip().split('/'): if re.fullmatch(r'\d+-[\d.]+', seg): port = seg if port: return port except Exception: pass # Fallback: sysfs readlink try: real = Path(f'/sys/block/{dev_name}').resolve() port = None for seg in str(real).split('/'): if re.fullmatch(r'\d+[\-\d.]+', seg) and ':' not in seg: port = seg return port except Exception: return None def usb_devices(): try: out = subprocess.check_output( ['lsblk', '-J', '-o', 'NAME,TRAN,MOUNTPOINT,LABEL,SIZE,MODEL'], timeout=10, text=True ) data = json.loads(out) except Exception as e: log.error(f'lsblk: {e}') return [] result = [] for bd in data.get('blockdevices', []): if bd.get('tran') != 'usb': continue name = bd['name'] port = usb_port_of(name) model = (bd.get('label') or bd.get('model') or name).strip() for child in (bd.get('children') or []): result.append({ 'device': f'/dev/{child["name"]}', 'usb_port': port, 'mount': child.get('mountpoint') or '', 'label': (child.get('label') or model).strip(), 'size': child.get('size') or bd.get('size') or '', }) if not bd.get('children'): result.append({ 'device': f'/dev/{name}', 'usb_port': port, 'mount': bd.get('mountpoint') or '', 'label': model, 'size': bd.get('size') or '', }) return result def ensure_mount(dev_info): mp = dev_info.get('mount') if mp: return mp, False dev = dev_info['device'] mp = f'/mnt/picopy{dev.replace("/","_")}' os.makedirs(mp, exist_ok=True) r = subprocess.run(['mount', dev, mp], capture_output=True) if r.returncode: log.error(f'mount failed: {r.stderr.decode()}') return None, False return mp, True # -- Kopier-Logik -------------------------------------------------------------- def add_log(msg): log.info(msg) with copy_lock: copy_state['logs'].append({'t': datetime.now().strftime('%H:%M:%S'), 'm': msg}) copy_state['logs'] = copy_state['logs'][-200:] import hashlib as _hashlib SYSTEM_EXCLUDES = { '.DS_Store', 'Thumbs.db', 'thumbs.db', 'desktop.ini', '.Spotlight-V100', '.Trashes', '.fseventsd', '.TemporaryItems', '.VolumeIcon.icns', 'RECYCLER', '$RECYCLE.BIN', 'System Volume Information', '.DocumentRevisions-V100', } def _should_copy(f: Path, cfg: dict) -> bool: if cfg.get('exclude_system'): for part in f.parts: if part in SYSTEM_EXCLUDES: return False if f.name.startswith('._'): return False filt = cfg.get('file_filter', '').strip() if filt: allowed = {e.strip().lower().lstrip('.') for e in filt.split(',') if e.strip()} if f.suffix.lower().lstrip('.') not in allowed: return False return True def _unique_path(p: Path) -> Path: stem, suffix, parent = p.stem, p.suffix, p.parent i = 1 while True: candidate = parent / f'{stem}_({i}){suffix}' if not candidate.exists(): return candidate i += 1 def _file_md5(p: Path) -> str: h = _hashlib.md5() with open(p, 'rb') as f: for chunk in iter(lambda: f.read(65536), b''): h.update(chunk) return h.hexdigest() def _atomic_write(path: Path, content: str) -> None: """Schreibt atomar: erst .tmp, dann os.replace() - sicher bei Stromausfall.""" tmp = path.with_suffix(path.suffix + '.tmp') try: tmp.write_text(content, encoding='utf-8') with open(tmp, 'rb') as fh: os.fsync(fh.fileno()) # Daten wirklich auf Datenträger schreiben os.replace(str(tmp), str(path)) # Atomares Umbenennen (POSIX-Garantie) except Exception: try: tmp.unlink(missing_ok=True) except Exception: pass raise def cleanup_stale_mounts() -> None: """Bereinigt beim Start hängen gebliebene PiCopy-Mounts (z.B. nach Stromausfall).""" try: with open('/proc/mounts') as fh: mps = [line.split()[1] for line in fh if '/mnt/picopy' in line] for mp in mps: log.info(f'Bereinige veralteten Mount: {mp}') subprocess.run(['umount', '-l', mp], capture_output=True) except Exception as e: log.warning(f'Stale-Mount-Bereinigung fehlgeschlagen: {e}') def _fmt_bytes(b): if b < 1024: return f'{b} B' if b < 1024**2: return f'{b/1024:.1f} KB' if b < 1024**3: return f'{b/1024**2:.1f} MB' return f'{b/1024**3:.2f} GB' def _resolve_source_ports(cfg) -> list: """Gibt source_ports als [{port, label}]-Liste zurück. Migriert altes source_port-Feld.""" ports = cfg.get('source_ports') or [] if not ports and cfg.get('source_port'): ports = [{'port': cfg['source_port'], 'label': cfg.get('source_label', '')}] return ports def do_copy(src_devs, dst_dev, cfg): """Kopiert von einer oder mehreren Quellen auf ein Ziel.""" dst_mp = None dst_owned = False src_mounts = [] # [(src_dev, src_mp, src_owned)] try: with copy_lock: copy_state.update(running=True, progress=0, error=None, done=0, total=0, logs=[], current='', bytes_total=0, bytes_done=0, start_ts=time.time(), eta_sec=None, speed_bps=0, phase='copy') save_state() n = len(src_devs) add_log(f'Kopiervorgang gestartet ({n} Quelle{"n" if n != 1 else ""})') dst_mp, dst_owned = ensure_mount(dst_dev) if not dst_mp: raise RuntimeError(f'Ziel nicht mountbar: {dst_dev["device"]}') add_log(f'Ziel: {dst_mp} ({dst_dev["label"]})') ts = datetime.now() date_str = ts.strftime(cfg['folder_format']) if cfg.get('add_time'): date_str += '_' + ts.strftime('%H%M%S') # -- Alle Quellen mounten & Dateien sammeln ------------------------- # source_data: [(src_dev, src_path, files, dst_dir, incomplete_marker)] source_data = [] total = 0 bytes_total = 0 for src_dev in src_devs: with copy_lock: cancelled = not copy_state['running'] if cancelled: add_log('Abgebrochen') return src_mp_i, src_owned_i = ensure_mount(src_dev) src_mounts.append((src_dev, src_mp_i, src_owned_i)) if not src_mp_i: add_log(f'Quelle nicht mountbar: {src_dev["device"]} - übersprungen') continue add_log(f'Quelle: {src_mp_i} ({src_dev["label"]})') src_path = Path(src_mp_i) all_files = [f for f in src_path.rglob('*') if f.is_file()] files = [f for f in all_files if _should_copy(f, cfg)] n_filtered = len(all_files) - len(files) if n_filtered: add_log(f'{n_filtered} Dateien gefiltert ({src_dev["label"]})') label = re.sub(r'[^\w\-]', '_', src_dev.get('label', 'source')) dst_dir_i = Path(dst_mp) / date_str if cfg.get('subfolder'): dst_dir_i = dst_dir_i / label dst_dir_i.mkdir(parents=True, exist_ok=True) add_log(f'Zielordner: {dst_dir_i}') for stale in dst_dir_i.rglob('*.picopy_tmp'): stale.unlink(missing_ok=True) incomplete_marker_i = dst_dir_i / '.picopy_incomplete' incomplete_marker_i.write_text(json.dumps({ 'started': datetime.now().isoformat(), 'source': src_dev.get('label', ''), })) total += len(files) bytes_total += sum(f.stat().st_size for f in files) source_data.append((src_dev, src_path, files, dst_dir_i, incomplete_marker_i)) with copy_lock: copy_state['total'] = total copy_state['bytes_total'] = bytes_total add_log(f'{total} Dateien gesamt ({_fmt_bytes(bytes_total)})') save_state() # -- Phase 1: Kopieren (alle Quellen) -------------------------------- dup_mode = cfg.get('duplicate_handling', 'skip') all_copied_pairs = [] skipped = 0 io_errors = 0 global_done = 0 for src_dev_i, src_path_i, files_i, dst_dir_i, _ in source_data: if len(src_devs) > 1: add_log(f'Kopiere: {src_dev_i["label"]}') for f in files_i: with copy_lock: cancelled = not copy_state['running'] if cancelled: add_log('Abgebrochen') return global_done += 1 rel = f.relative_to(src_path_i) dst_f = dst_dir_i / rel try: dst_f.parent.mkdir(parents=True, exist_ok=True) except OSError as mkdir_err: io_errors += 1 add_log(f'⚠ Verzeichnis nicht erstellbar ({dst_f.parent.name}): {mkdir_err}') with copy_lock: copy_state.update(done=global_done, progress=int(global_done/total*100) if total else 100, current=str(f.name)) continue if dst_f.exists(): if dup_mode == 'skip': if dst_f.stat().st_size == f.stat().st_size: skipped += 1 with copy_lock: copy_state.update(done=global_done, progress=int(global_done/total*100) if total else 100, current=str(f.name)) continue else: add_log(f'Unvollständige Datei, wird neu kopiert: {f.name}') elif dup_mode == 'rename': dst_f = _unique_path(dst_f) fsize = f.stat().st_size tmp_f = dst_f.with_name(dst_f.name + '.picopy_tmp') try: shutil.copy2(f, tmp_f) os.replace(str(tmp_f), str(dst_f)) except OSError as copy_err: try: tmp_f.unlink(missing_ok=True) except Exception: pass io_errors += 1 add_log(f'⚠ Fehler bei {f.name}: {copy_err}') with copy_lock: copy_state.update(done=global_done, progress=int(global_done/total*100) if total else 100, current=str(f.name)) continue all_copied_pairs.append((f, dst_f)) with copy_lock: copy_state['bytes_done'] += fsize bd = copy_state['bytes_done'] bt = copy_state['bytes_total'] elapsed = time.time() - copy_state['start_ts'] speed = bd / elapsed if elapsed > 1 else 0 eta = int((bt - bd) / speed) if speed > 0 and bt > bd else 0 copy_state.update(done=global_done, progress=int(global_done/total*100) if total else 100, current=str(f.name), speed_bps=int(speed), eta_sec=eta) if global_done % 20 == 0: save_state() msg_parts = [f'{len(all_copied_pairs)} kopiert'] if skipped: msg_parts.append(f'{skipped} übersprungen') if io_errors: msg_parts.append(f'{io_errors} Fehler (I/O)') # -- Phase 2: Verifizieren ------------------------------------------ verify_errors = 0 verified_pairs = list(all_copied_pairs) if cfg.get('verify_checksum') and all_copied_pairs: with copy_lock: copy_state.update(phase='verify', progress=0, done=0, total=len(all_copied_pairs), current='', eta_sec=None, speed_bps=0) add_log(f'Verifiziere {len(all_copied_pairs)} Dateien...') verified_pairs = [] for i, (src_f, dst_f) in enumerate(all_copied_pairs): with copy_lock: cancelled = not copy_state['running'] if not cancelled: copy_state.update(done=i+1, progress=int((i+1)/len(all_copied_pairs)*100), current=src_f.name) if cancelled: add_log('Abgebrochen') return if _file_md5(src_f) == _file_md5(dst_f): verified_pairs.append((src_f, dst_f)) else: verify_errors += 1 add_log(f'⚠ Prüfsummenfehler: {src_f.name}') try: dst_f.unlink() except Exception: pass if verify_errors: msg_parts.append(f'{verify_errors} Prüfsummenfehler!') add_log(f'Verifizierung: {verify_errors} Fehler!') else: add_log(f'Alle {len(verified_pairs)} Dateien verifiziert ✓') # -- Phase 3: Quelle löschen ---------------------------------------- if cfg.get('delete_source') and verified_pairs: if verify_errors: add_log('Quelldateien NICHT gelöscht (Prüfsummenfehler)') else: with copy_lock: copy_state.update(phase='delete', current='') add_log(f'Lösche {len(verified_pairs)} Quelldateien...') del_errors = 0 for src_f, _ in verified_pairs: try: src_f.unlink() except Exception as e: del_errors += 1 log.warning(f'Löschen fehlgeschlagen: {src_f}: {e}') if del_errors: msg_parts.append(f'{del_errors} Löschfehler') else: add_log('Quelle geleert ✓') subprocess.run(['sync'], capture_output=True) for _, _, _, _, incomplete_marker_i in source_data: try: incomplete_marker_i.unlink(missing_ok=True) except Exception: pass with copy_lock: copy_state['last_copy'] = datetime.now().isoformat() add_log('Fertig! ' + ', '.join(msg_parts)) dst_dir_root = Path(dst_mp) / date_str threading.Thread(target=run_uploads, args=(dst_dir_root, cfg), daemon=True).start() except Exception as e: log.exception('Copy failed') with copy_lock: copy_state['error'] = str(e) add_log(f'Fehler: {e}') finally: subprocess.run(['sync'], capture_output=True) for _, src_mp_i, src_owned_i in src_mounts: if src_owned_i and src_mp_i: subprocess.run(['umount', src_mp_i], capture_output=True) if dst_owned and dst_mp: subprocess.run(['umount', dst_mp], capture_output=True) with copy_lock: copy_state['running'] = False copy_state['current'] = '' copy_state['phase'] = 'idle' save_state() def check_auto_copy(): cfg = load_cfg() src_ports = _resolve_source_ports(cfg) if not cfg.get('auto_copy') or not src_ports or not cfg.get('dest_port'): return with copy_lock: if copy_state['running'] or copy_state['error']: return devs = usb_devices() srcs = [next((d for d in devs if d['usb_port'] == sp['port']), None) for sp in src_ports] srcs = [s for s in srcs if s is not None] dst = next((d for d in devs if d['usb_port'] == cfg['dest_port']), None) if srcs and dst: log.info(f'Auto-Copy: {len(srcs)} Quelle(n) und Ziel verbunden') threading.Thread(target=do_copy, args=(srcs, dst, cfg), daemon=True).start() def usb_monitor(): try: import pyudev ctx = pyudev.Context() mon = pyudev.Monitor.from_netlink(ctx) mon.filter_by(subsystem='block', device_type='disk') for dev in iter(mon.poll, None): if dev.action == 'add': log.info(f'USB eingesteckt: {dev.device_node}') threading.Timer(3.0, check_auto_copy).start() except ImportError: log.warning('pyudev nicht verfügbar') # -- Upload-Ziele (rclone) ----------------------------------------------------- RCLONE_CONF = BASE_DIR / 'rclone.conf' upload_state = { 'running': False, 'current': '', 'results': [], } upload_lock = threading.Lock() def _rclone(*args, timeout=60): return subprocess.run( ['rclone', '--config', str(RCLONE_CONF)] + list(args), capture_output=True, text=True, timeout=timeout ) def _rclone_obscure(pw): r = subprocess.run(['rclone', 'obscure', pw], capture_output=True, text=True, timeout=10) return r.stdout.strip() def _remote_name(tid): return f'picopy_{tid}' def configure_smb_remote(tid, host, share, user, pw): rn = _remote_name(tid) _rclone('config', 'delete', rn) args = ['config', 'create', rn, 'smb', f'host={host}', f'share={share}'] if user: args += [f'user={user}'] if pw: args += [f'pass={_rclone_obscure(pw)}'] r = _rclone(*args) return r.returncode == 0, r.stderr.strip() def delete_remote(tid): _rclone('config', 'delete', _remote_name(tid)) def test_remote(tid): r = _rclone('lsd', f'{_remote_name(tid)}:', timeout=20) return r.returncode == 0, r.stderr.strip() def run_uploads(local_dir: Path, cfg: dict): """Lädt local_dir zu allen aktiven Fernzielen hoch. Läuft im Background-Thread.""" targets = [t for t in cfg.get('upload_targets', []) if t.get('enabled')] if not targets: return with upload_lock: upload_state.update(running=True, results=[], current='') for t in targets: name = t.get('name', t['id']) with upload_lock: upload_state['current'] = name add_log(f'Upload >> {name}...') dest_root = t.get('dest_path', 'PiCopy').strip('/') dest = f'{_remote_name(t["id"])}:{dest_root}' r = _rclone('copy', str(local_dir), dest, '--create-empty-src-dirs', '--retries', '3', timeout=7200) ok = r.returncode == 0 err = (r.stderr.strip().splitlines()[-1] if r.stderr.strip() else '') if not ok else '' with upload_lock: upload_state['results'].append({'name': name, 'ok': ok, 'msg': err}) add_log(f'Upload {name}: {"✓ OK" if ok else "✗ Fehler - " + err}') with upload_lock: upload_state['running'] = False upload_state['current'] = '' # -- Flask Routes -------------------------------------------------------------- @app.route('/') def index(): return HTML.replace('__PICOPY_VERSION__', VERSION) @app.route('/api/devices') def r_devices(): return jsonify(usb_devices()) @app.route('/api/config', methods=['GET', 'POST']) def r_config(): if request.method == 'POST': cfg = load_cfg() cfg.update(request.get_json(force=True)) save_cfg(cfg) return jsonify(ok=True) return jsonify(load_cfg()) @app.route('/api/status') def r_status(): with copy_lock: cs = dict(copy_state) with wifi_lock: ws = dict(wifi_state) with wg_lock: wgs = dict(wg_state) return jsonify(copy=cs, wifi=ws, vpn=wgs) @app.route('/api/copy/start', methods=['POST']) def r_start(): global _copy_thread with copy_lock: if copy_state['running']: return jsonify(error='Bereits aktiv'), 400 if _copy_thread is not None and _copy_thread.is_alive(): return jsonify(error='Abbruch wird noch abgeschlossen - bitte kurz warten und erneut versuchen.'), 400 cfg = load_cfg() devs = usb_devices() body = request.get_json(force=True) or {} wanted_ports = body.get('ports') # None = alle konfigurierten Quellen src_ports = _resolve_source_ports(cfg) srcs = [next((d for d in devs if d['usb_port'] == sp['port']), None) for sp in src_ports] srcs = [s for s in srcs if s is not None] if wanted_ports is not None: srcs = [s for s in srcs if s['usb_port'] in wanted_ports] if not srcs: return jsonify(error='Keine Quellgeräte gefunden (Ports nicht verbunden)'), 400 dst = next((d for d in devs if d['usb_port'] == cfg.get('dest_port')), None) if not dst: return jsonify(error='Zielgerät nicht gefunden'), 400 _copy_thread = threading.Thread(target=do_copy, args=(srcs, dst, cfg), daemon=True) _copy_thread.start() return jsonify(ok=True) @app.route('/api/copy/cancel', methods=['POST']) def r_cancel(): with copy_lock: copy_state['running'] = False return jsonify(ok=True) @app.route('/api/wifi/scan') def r_wifi_scan(): nets = scan_wifi_networks() return jsonify(nets) @app.route('/api/wifi/connect', methods=['POST']) def r_wifi_connect(): data = request.get_json(force=True) ssid = data.get('ssid', '').strip() pw = data.get('password', '').strip() if not ssid: return jsonify(error='SSID fehlt'), 400 cfg = load_cfg() cfg['wifi_ssid'] = ssid cfg['wifi_password'] = pw save_cfg(cfg) def _connect(): ap_was_active = is_ap_active() if ap_was_active: stop_ap() time.sleep(2) ok = connect_client_wifi(ssid, pw) if ok: time.sleep(5) update_wifi_state() else: if ap_was_active: start_ap(cfg.get('ap_ssid', 'PiCopy'), cfg.get('ap_password', 'PiCopy,')) update_wifi_state() threading.Thread(target=_connect, daemon=True).start() return jsonify(ok=True, msg='Verbindungsversuch gestartet') @app.route('/api/wifi/ap', methods=['POST']) def r_wifi_ap(): data = request.get_json(force=True) ssid = data.get('ssid', '').strip() pw = data.get('password', '').strip() if not ssid or len(pw) < 8: return jsonify(error='SSID fehlt oder Passwort zu kurz (min. 8 Zeichen)'), 400 cfg = load_cfg() cfg['ap_ssid'] = ssid cfg['ap_password'] = pw save_cfg(cfg) def _restart_ap(): if is_ap_active(): stop_ap() time.sleep(2) start_ap(ssid, pw) time.sleep(3) with wifi_lock: wifi_state.update(mode='ap', ssid=ssid, ip='10.42.0.1') threading.Thread(target=_restart_ap, daemon=True).start() return jsonify(ok=True) @app.route('/api/wifi/status') def r_wifi_status(): with wifi_lock: return jsonify(dict(wifi_state)) # -- WireGuard Routes --------------------------------------------------------- @app.route('/api/wireguard/config', methods=['GET', 'POST']) def r_wg_config(): if request.method == 'POST': data = request.get_json(force=True) content = data.get('content', '') if not content.strip(): return jsonify(error='Konfiguration ist leer'), 400 ok, err = wg_save_config(content) if not ok: return jsonify(error=err), 500 auto = data.get('auto') if auto is not None: c = load_cfg() c['wireguard_auto'] = bool(auto) save_cfg(c) with wg_lock: wg_state['has_config'] = True return jsonify(ok=True) if WG_CONF.exists(): content = WG_CONF.read_text(encoding='utf-8') masked = re.sub(r'(PrivateKey\s*=\s*)(.+)', r'\1****', content) return jsonify(exists=True, config=masked) return jsonify(exists=False, config='') @app.route('/api/wireguard/connect', methods=['POST']) def r_wg_connect(): threading.Thread(target=wg_connect, daemon=True).start() return jsonify(ok=True, msg='Verbindungsversuch gestartet') @app.route('/api/wireguard/disconnect', methods=['POST']) def r_wg_disconnect(): ok = wg_disconnect() return jsonify(ok=ok) @app.route('/api/wireguard/install', methods=['POST']) def r_wg_install(): with wg_lock: if wg_state['pkg_running']: return jsonify(error='Bereits aktiv'), 400 threading.Thread(target=wg_install, daemon=True).start() return jsonify(ok=True) @app.route('/api/wireguard/uninstall', methods=['POST']) def r_wg_uninstall(): with wg_lock: if wg_state['pkg_running']: return jsonify(error='Bereits aktiv'), 400 threading.Thread(target=wg_uninstall, daemon=True).start() return jsonify(ok=True) # -- Upload Routes -------------------------------------------------------------- @app.route('/api/upload/targets', methods=['GET']) def r_upload_list(): return jsonify(load_cfg().get('upload_targets', [])) @app.route('/api/upload/targets', methods=['POST']) def r_upload_add(): data = request.get_json(force=True) cfg = load_cfg() tid = data.get('id') or _uuid_mod.uuid4().hex[:8] ctype = data.get('type', 'smb') if ctype != 'smb': return jsonify(error='Nur SMB/NAS wird unterstützt'), 400 ok, err = configure_smb_remote( tid, data.get('host', ''), data.get('share', ''), data.get('user', ''), data.get('pass', '')) if not ok: return jsonify(error=f'rclone: {err}'), 500 entry = { 'id': tid, 'type': ctype, 'name': data.get('name', ctype), 'dest_path': data.get('dest_path', 'PiCopy'), 'enabled': True, } targets = [t for t in cfg.get('upload_targets', []) if t['id'] != tid] targets.append(entry) cfg['upload_targets'] = targets save_cfg(cfg) return jsonify(ok=True, id=tid) @app.route('/api/upload/targets/', methods=['DELETE']) def r_upload_del(tid): cfg = load_cfg() cfg['upload_targets'] = [t for t in cfg.get('upload_targets', []) if t['id'] != tid] save_cfg(cfg) delete_remote(tid) return jsonify(ok=True) @app.route('/api/upload/targets//toggle', methods=['POST']) def r_upload_toggle(tid): cfg = load_cfg() for t in cfg.get('upload_targets', []): if t['id'] == tid: t['enabled'] = not t.get('enabled', True) break save_cfg(cfg) return jsonify(ok=True) @app.route('/api/upload/targets//test', methods=['POST']) def r_upload_test(tid): ok, err = test_remote(tid) return jsonify(ok=ok, error=err) @app.route('/api/upload/status') def r_upload_status(): with upload_lock: return jsonify(dict(upload_state)) # -- Browse (persistente Mounts für File-Explorer) ----------------------------- _browse_mounts = {} # usb_port -> mount_point def _mp_is_alive(mp): """Prüft ob ein Mount-Punkt wirklich aktiv und lesbar ist.""" try: with open('/proc/mounts') as f: mounted = any(mp in line.split() for line in f) if not mounted: return False os.listdir(mp) # I/O-Test: schlägt fehl wenn Gerät entfernt wurde return True except Exception: return False def _drop_browse_mount(port): """Veralteten Mount bereinigen.""" mp = _browse_mounts.pop(port, None) if mp: subprocess.run(['umount', '-l', mp], capture_output=True) log.info(f'Browse-Mount bereinigt: {mp}') def get_browse_mp(dev): port = dev.get('usb_port', '') # Auto-mount vom System bevorzugen if dev.get('mount') and _mp_is_alive(dev['mount']): return dev['mount'] # Gecachten Mount prüfen mp = _browse_mounts.get(port) if mp: if _mp_is_alive(mp): return mp _drop_browse_mount(port) # veraltet -> aufräumen # Frisch mounten mp = f'/mnt/picopy_br_{port}' os.makedirs(mp, exist_ok=True) r = subprocess.run(['mount', dev['device'], mp], capture_output=True) if r.returncode == 0: _browse_mounts[port] = mp return mp return None @app.route('/api/browse') def r_browse(): port = request.args.get('port', '') rpath = request.args.get('path', '').lstrip('/') devs = usb_devices() dev = next((d for d in devs if d['usb_port'] == port), None) if not dev: return jsonify(error='Gerät nicht verbunden - bitte neu einstecken'), 404 mp = get_browse_mp(dev) if not mp: return jsonify(error='Gerät nicht lesbar - bitte neu einstecken'), 500 try: base = Path(mp).resolve() target = (base / rpath).resolve() if not str(target).startswith(str(base)): return jsonify(error='Ungültiger Pfad'), 400 if not target.is_dir(): return jsonify(error='Kein Verzeichnis'), 400 entries = [] for item in sorted(target.iterdir(), key=lambda x: (x.is_file(), x.name.lower())): try: s = item.stat() entries.append({ 'name': item.name, 'dir': item.is_dir(), 'size': s.st_size if item.is_file() else None, 'mtime': datetime.fromtimestamp(s.st_mtime).strftime('%d.%m.%y %H:%M'), }) except OSError: pass rel = str(target.relative_to(base)) return jsonify(path='' if rel == '.' else rel, entries=entries) except OSError as e: import errno as _errno if e.errno == _errno.EIO: # I/O-Fehler = Gerät abgezogen, Mount bereinigen _drop_browse_mount(port) return jsonify(error='Gerät nicht mehr erreichbar - bitte neu einstecken'), 503 return jsonify(error=str(e)), 500 except Exception as e: return jsonify(error=str(e)), 500 # -- Update-System ------------------------------------------------------------- update_state = { 'current': VERSION, 'latest': None, 'available': False, 'checking': False, 'error': None, 'last_checked': None, } update_lock = threading.Lock() def _vtuple(v): try: return tuple(int(x) for x in v.strip().lstrip('v').split('.')) except Exception: return (0,) def check_for_updates(): with update_lock: if update_state['checking']: return update_state['checking'] = True update_state['error'] = None try: req = _urlreq.urlopen(f'{RAW_BASE}/version.txt', timeout=10) latest = req.read().decode().strip() avail = _vtuple(latest) > _vtuple(VERSION) with update_lock: update_state.update(latest=latest, available=avail, last_checked=datetime.now().isoformat()) if avail: log.info(f'Update verfügbar: {VERSION} -> {latest}') except Exception as e: with update_lock: update_state['error'] = str(e) log.warning(f'Update-Check fehlgeschlagen: {e}') finally: with update_lock: update_state['checking'] = False def update_check_loop(): time.sleep(5) # Kurz nach Start einmalig prüfen while True: check_for_updates() time.sleep(6 * 3600) # Dann alle 6 Stunden @app.route('/api/update/status') def r_update_status(): with update_lock: return jsonify(dict(update_state)) @app.route('/api/update/check', methods=['POST']) def r_update_check(): threading.Thread(target=check_for_updates, daemon=True).start() return jsonify(ok=True) @app.route('/api/system/reboot', methods=['POST']) def r_system_reboot(): threading.Thread(target=lambda: ( __import__('time').sleep(1), subprocess.Popen(['reboot']) ), daemon=True).start() return jsonify(ok=True) @app.route('/api/update/install', methods=['POST']) def r_update_install(): try: log.info('Update wird heruntergeladen...') req = _urlreq.urlopen(f'{RAW_BASE}/app.py', timeout=60) new_code = req.read().decode() vreq = _urlreq.urlopen(f'{RAW_BASE}/version.txt', timeout=10) new_version = vreq.read().decode().strip() # Syntax-Check bevor wir irgendetwas überschreiben compile(new_code, 'app.py', 'exec') tmp = Path('/opt/picopy/app.py.tmp') tmp.write_text(new_code, encoding='utf-8') with open(tmp, 'rb') as fh: os.fsync(fh.fileno()) # Sicherstellen dass Daten auf der Platte sind os.replace(str(tmp), '/opt/picopy/app.py') # Atomares Umbenennen vtmp = Path('/opt/picopy/version.txt.tmp') vtmp.write_text(new_version + '\n', encoding='utf-8') with open(vtmp, 'rb') as fh: os.fsync(fh.fileno()) os.replace(str(vtmp), '/opt/picopy/version.txt') log.info('Update installiert - starte Dienst neu...') # Systemd startet den Dienst automatisch neu subprocess.Popen(['systemctl', 'restart', 'picopy']) return jsonify(ok=True) except SyntaxError as e: return jsonify(error=f'Update-Datei ungültig: {e}'), 500 except Exception as e: log.exception('Update fehlgeschlagen') return jsonify(error=str(e)), 500 # -- HTML Template ------------------------------------------------------------- HTML = r""" PiCopy
verfügbar
Verbinde...
Kopierstatus
Bereit
USB Ports & Datei-Explorer
+ Quelle hinzufügen
Gerät einstecken → aus Liste wählen → Hinzufügen. Mehrere Quellen werden nacheinander auf dasselbe Ziel kopiert.
▼ Ziel
-
Kein Port konfiguriert
Gerät in den gewünschten Port → aus Liste wählen → Speichern. Ab dann wird dieser Port immer als Ziel verwendet.
Port konfigurieren und Gerät verbinden
Kopier-Einstellungen
Ordnerstruktur
Dateifilter
Duplikate
Integrität & Aufräumen
^
Fernkopie - NAS / SMB
WiFi-Einstellungen
Heimnetz
Hotspot (AP)
Heimnetz für die Router-Verbindung. Ohne Verbindung startet PiCopy automatisch einen eigenen Hotspot.
Startet automatisch wenn kein Heimnetz erreichbar ist.
IP im Hotspot-Modus: 10.42.0.1:8080
WireGuard VPN
System
=
Logs
Noch keine Einträge
""" if __name__ == '__main__': cleanup_stale_mounts() load_state() wg_update_state() threading.Thread(target=usb_monitor, daemon=True).start() threading.Thread(target=wifi_monitor, daemon=True).start() threading.Thread(target=wg_monitor, daemon=True).start() threading.Thread(target=update_check_loop, daemon=True).start() if load_cfg().get('wireguard_auto') and WG_CONF.exists(): threading.Thread(target=wg_connect, daemon=True).start() log.info(f'PiCopy v{VERSION} läuft auf http://0.0.0.0:8080') app.run(host='0.0.0.0', port=8080, debug=False, use_reloader=False)