#!/usr/bin/env python3 """PiCopy v2 - USB Copy Service mit WiFi-Fallback AP""" import os import re import json import shutil import logging import threading import subprocess import time from datetime import datetime from pathlib import Path from flask import Flask, jsonify, request app = Flask(__name__) BASE_DIR = Path('/opt/picopy') CONFIG_FILE = BASE_DIR / 'config.json' STATE_FILE = BASE_DIR / 'state.json' LOG_DIR = BASE_DIR / 'logs' LOG_FILE = LOG_DIR / 'picopy.log' LOG_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()] ) log = logging.getLogger('picopy') NM_AP_CON = 'PiCopy-AP' NM_CLIENT_CON = 'PiCopy-WiFi' WIFI_BOOT_WAIT = 25 # Sekunden warten beim Start bevor AP gestartet wird DEFAULT_CONFIG = { # USB 'source_port': None, 'dest_port': None, 'folder_format': '%Y-%m-%d', 'add_time': True, 'subfolder': True, 'auto_copy': True, # WiFi 'wifi_ssid': '', 'wifi_password': '', 'ap_ssid': 'PiCopy', 'ap_password': 'PiCopy,', } # ── Persistenter Kopierstatus ─────────────────────────────────────────────── copy_state = { 'running': False, 'progress': 0, 'total': 0, 'done': 0, 'current': '', 'error': None, 'last_copy': None, 'logs': [], } copy_lock = threading.Lock() def load_state(): global copy_state try: if STATE_FILE.exists(): saved = json.loads(STATE_FILE.read_text()) # Nur nicht-laufende Daten wiederherstellen saved['running'] = False saved['current'] = '' copy_state.update(saved) except Exception: pass def save_state(): try: with copy_lock: data = dict(copy_state) STATE_FILE.write_text(json.dumps(data)) except Exception: pass # ── WiFi Status ───────────────────────────────────────────────────────────── wifi_state = { 'mode': 'unknown', # 'client' | 'ap' | 'disconnected' 'ssid': '', 'ip': '', } wifi_lock = threading.Lock() # ── Config ─────────────────────────────────────────────────────────────────── def load_cfg(): cfg = DEFAULT_CONFIG.copy() try: if CONFIG_FILE.exists(): cfg.update(json.loads(CONFIG_FILE.read_text())) except Exception: pass return cfg def save_cfg(cfg): CONFIG_FILE.write_text(json.dumps(cfg, indent=2)) # ── WiFi Hilfsfunktionen ───────────────────────────────────────────────────── def nm(*args): return subprocess.run(['nmcli'] + list(args), capture_output=True, text=True, timeout=20) def get_wlan0_info(): r = nm('-t', '-f', 'DEVICE,STATE,CONNECTION', 'dev') for line in r.stdout.splitlines(): parts = line.split(':') if parts and parts[0] == 'wlan0': return { 'state': parts[1] if len(parts) > 1 else '', 'connection': ':'.join(parts[2:]) if len(parts) > 2 else '', } return {'state': '', 'connection': ''} def get_wifi_ip(): r = nm('-t', '-f', 'IP4.ADDRESS', 'dev', 'show', 'wlan0') for line in r.stdout.splitlines(): if 'IP4.ADDRESS' in line: ip = line.split(':')[-1].split('/')[0].strip() if ip: return ip return '' def is_client_connected(): info = get_wlan0_info() return (info['state'] == 'connected' and info['connection'] and NM_AP_CON not in info['connection']) def is_ap_active(): r = nm('-t', '-f', 'NAME,STATE', 'con', 'show', '--active') return any(NM_AP_CON in l and 'activated' in l for l in r.stdout.splitlines()) def start_ap(ssid, password): log.info(f'Starte AP: {ssid}') nm('con', 'delete', NM_AP_CON) time.sleep(1) r = nm('dev', 'wifi', 'hotspot', 'ifname', 'wlan0', 'ssid', ssid, 'password', password, 'con-name', NM_AP_CON) ok = r.returncode == 0 if ok: log.info('AP gestartet') else: log.error(f'AP Fehler: {r.stderr}') return ok def stop_ap(): log.info('Stoppe AP') nm('con', 'down', NM_AP_CON) def connect_client_wifi(ssid, password): log.info(f'Verbinde mit WiFi: {ssid}') # Bestehende PiCopy-WiFi Verbindung löschen nm('con', 'delete', NM_CLIENT_CON) time.sleep(1) r = nm('dev', 'wifi', 'connect', ssid, 'password', password, 'name', NM_CLIENT_CON, 'ifname', 'wlan0') ok = r.returncode == 0 if ok: log.info(f'Verbunden mit {ssid}') else: log.error(f'WiFi-Verbindung fehlgeschlagen: {r.stderr.strip()}') return ok def scan_wifi_networks(): nm('dev', 'wifi', 'rescan') time.sleep(2) r = nm('-t', '-f', 'SSID,SIGNAL,SECURITY', 'dev', 'wifi', 'list') seen, nets = set(), [] for line in r.stdout.splitlines(): parts = line.split(':') if len(parts) >= 2: ssid = parts[0].strip() signal = parts[1].strip() if len(parts) > 1 else '0' security = ':'.join(parts[2:]).strip() if len(parts) > 2 else '' if ssid and ssid not in seen: seen.add(ssid) nets.append({'ssid': ssid, 'signal': int(signal) if signal.isdigit() else 0, 'security': security}) return sorted(nets, key=lambda x: -x['signal']) # ── WiFi Monitor Thread ─────────────────────────────────────────────────────── def update_wifi_state(): info = get_wlan0_info() if info['state'] == 'connected': if NM_AP_CON in info['connection']: with wifi_lock: wifi_state.update(mode='ap', ssid=load_cfg().get('ap_ssid', 'PiCopy'), ip='10.42.0.1') else: ip = get_wifi_ip() with wifi_lock: wifi_state.update(mode='client', ssid=info['connection'], ip=ip) else: with wifi_lock: wifi_state.update(mode='disconnected', ssid='', ip='') def wifi_monitor(): log.info(f'WiFi-Monitor: warte {WIFI_BOOT_WAIT}s auf Verbindung...') time.sleep(WIFI_BOOT_WAIT) while True: try: update_wifi_state() with wifi_lock: mode = wifi_state['mode'] if mode == 'disconnected': cfg = load_cfg() ssid = cfg.get('wifi_ssid', '') pw = cfg.get('wifi_password', '') connected = False if ssid: connected = connect_client_wifi(ssid, pw) if connected: time.sleep(5) update_wifi_state() if not connected: ap_ssid = cfg.get('ap_ssid', 'PiCopy') ap_pw = cfg.get('ap_password', 'PiCopy,') if start_ap(ap_ssid, ap_pw): time.sleep(3) with wifi_lock: wifi_state.update(mode='ap', ssid=ap_ssid, ip='10.42.0.1') except Exception as e: log.error(f'WiFi-Monitor Fehler: {e}') time.sleep(30) # ── USB Geräteerkennung ─────────────────────────────────────────────────────── def usb_port_of(dev_name): try: real = Path(f'/sys/block/{dev_name}').resolve() port = None for seg in str(real).split('/'): if re.fullmatch(r'\d+[\-\d.]+', seg) and ':' not in seg: port = seg return port except Exception: return None def usb_devices(): try: out = subprocess.check_output( ['lsblk', '-J', '-o', 'NAME,TRAN,MOUNTPOINT,LABEL,SIZE,MODEL'], timeout=10, text=True ) data = json.loads(out) except Exception as e: log.error(f'lsblk: {e}') return [] result = [] for bd in data.get('blockdevices', []): if bd.get('tran') != 'usb': continue name = bd['name'] port = usb_port_of(name) model = (bd.get('label') or bd.get('model') or name).strip() for child in (bd.get('children') or []): result.append({ 'device': f'/dev/{child["name"]}', 'usb_port': port, 'mount': child.get('mountpoint') or '', 'label': (child.get('label') or model).strip(), 'size': child.get('size') or bd.get('size') or '', }) if not bd.get('children'): result.append({ 'device': f'/dev/{name}', 'usb_port': port, 'mount': bd.get('mountpoint') or '', 'label': model, 'size': bd.get('size') or '', }) return result def ensure_mount(dev_info): mp = dev_info.get('mount') if mp: return mp, False dev = dev_info['device'] mp = f'/mnt/picopy{dev.replace("/","_")}' os.makedirs(mp, exist_ok=True) r = subprocess.run(['mount', dev, mp], capture_output=True) if r.returncode: log.error(f'mount failed: {r.stderr.decode()}') return None, False return mp, True # ── Kopier-Logik ────────────────────────────────────────────────────────────── def add_log(msg): log.info(msg) with copy_lock: copy_state['logs'].append({'t': datetime.now().strftime('%H:%M:%S'), 'm': msg}) copy_state['logs'] = copy_state['logs'][-200:] def do_copy(src_dev, dst_dev, cfg): src_mp = dst_mp = None src_owned = dst_owned = False try: with copy_lock: copy_state.update(running=True, progress=0, error=None, done=0, total=0, logs=[], current='') save_state() add_log('Kopiervorgang gestartet') src_mp, src_owned = ensure_mount(src_dev) if not src_mp: raise RuntimeError(f'Quelle nicht mountbar: {src_dev["device"]}') add_log(f'Quelle: {src_mp} ({src_dev["label"]})') dst_mp, dst_owned = ensure_mount(dst_dev) if not dst_mp: raise RuntimeError(f'Ziel nicht mountbar: {dst_dev["device"]}') add_log(f'Ziel: {dst_mp} ({dst_dev["label"]})') ts = datetime.now() date_str = ts.strftime(cfg['folder_format']) if cfg.get('add_time'): date_str += '_' + ts.strftime('%H%M%S') label = re.sub(r'[^\w\-]', '_', src_dev.get('label', 'source')) dst_dir = Path(dst_mp) / date_str if cfg.get('subfolder'): dst_dir = dst_dir / label dst_dir.mkdir(parents=True, exist_ok=True) add_log(f'Zielordner: {dst_dir}') src_path = Path(src_mp) files = [f for f in src_path.rglob('*') if f.is_file()] total = len(files) with copy_lock: copy_state['total'] = total add_log(f'{total} Dateien gefunden') save_state() for i, f in enumerate(files): with copy_lock: if not copy_state['running']: add_log('Abgebrochen') return dst_f = dst_dir / f.relative_to(src_path) dst_f.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(f, dst_f) with copy_lock: copy_state.update(done=i+1, progress=int((i+1)/total*100) if total else 100, current=str(f.name)) if (i+1) % 20 == 0: save_state() with copy_lock: copy_state['last_copy'] = datetime.now().isoformat() add_log(f'Fertig! {total} Dateien kopiert nach {dst_dir.name}') except Exception as e: log.exception('Copy failed') with copy_lock: copy_state['error'] = str(e) add_log(f'Fehler: {e}') finally: if src_owned and src_mp: subprocess.run(['umount', src_mp], capture_output=True) if dst_owned and dst_mp: subprocess.run(['umount', dst_mp], capture_output=True) with copy_lock: copy_state['running'] = False copy_state['current'] = '' save_state() def check_auto_copy(): cfg = load_cfg() if not cfg.get('auto_copy') or not cfg.get('source_port') or not cfg.get('dest_port'): return with copy_lock: if copy_state['running']: return devs = usb_devices() src = next((d for d in devs if d['usb_port'] == cfg['source_port']), None) dst = next((d for d in devs if d['usb_port'] == cfg['dest_port']), None) if src and dst: log.info('Auto-Copy: beide Geräte verbunden') threading.Thread(target=do_copy, args=(src, dst, cfg), daemon=True).start() def usb_monitor(): try: import pyudev ctx = pyudev.Context() mon = pyudev.Monitor.from_netlink(ctx) mon.filter_by(subsystem='block', device_type='disk') for dev in iter(mon.poll, None): if dev.action == 'add': log.info(f'USB eingesteckt: {dev.device_node}') threading.Timer(3.0, check_auto_copy).start() except ImportError: log.warning('pyudev nicht verfügbar') # ── Flask Routes ────────────────────────────────────────────────────────────── @app.route('/') def index(): return HTML @app.route('/api/devices') def r_devices(): return jsonify(usb_devices()) @app.route('/api/config', methods=['GET', 'POST']) def r_config(): if request.method == 'POST': cfg = load_cfg() cfg.update(request.get_json(force=True)) save_cfg(cfg) return jsonify(ok=True) return jsonify(load_cfg()) @app.route('/api/status') def r_status(): with copy_lock: cs = dict(copy_state) with wifi_lock: ws = dict(wifi_state) return jsonify(copy=cs, wifi=ws) @app.route('/api/copy/start', methods=['POST']) def r_start(): with copy_lock: if copy_state['running']: return jsonify(error='Bereits aktiv'), 400 cfg = load_cfg() devs = usb_devices() src = next((d for d in devs if d['usb_port'] == cfg.get('source_port')), None) dst = next((d for d in devs if d['usb_port'] == cfg.get('dest_port')), None) if not src: return jsonify(error='Quellgerät nicht gefunden'), 400 if not dst: return jsonify(error='Zielgerät nicht gefunden'), 400 threading.Thread(target=do_copy, args=(src, dst, cfg), daemon=True).start() return jsonify(ok=True) @app.route('/api/copy/cancel', methods=['POST']) def r_cancel(): with copy_lock: copy_state['running'] = False return jsonify(ok=True) @app.route('/api/wifi/scan') def r_wifi_scan(): nets = scan_wifi_networks() return jsonify(nets) @app.route('/api/wifi/connect', methods=['POST']) def r_wifi_connect(): data = request.get_json(force=True) ssid = data.get('ssid', '').strip() pw = data.get('password', '').strip() if not ssid: return jsonify(error='SSID fehlt'), 400 cfg = load_cfg() cfg['wifi_ssid'] = ssid cfg['wifi_password'] = pw save_cfg(cfg) def _connect(): ap_was_active = is_ap_active() if ap_was_active: stop_ap() time.sleep(2) ok = connect_client_wifi(ssid, pw) if ok: time.sleep(5) update_wifi_state() else: if ap_was_active: start_ap(cfg.get('ap_ssid', 'PiCopy'), cfg.get('ap_password', 'PiCopy,')) update_wifi_state() threading.Thread(target=_connect, daemon=True).start() return jsonify(ok=True, msg='Verbindungsversuch gestartet') @app.route('/api/wifi/ap', methods=['POST']) def r_wifi_ap(): data = request.get_json(force=True) ssid = data.get('ssid', '').strip() pw = data.get('password', '').strip() if not ssid or len(pw) < 8: return jsonify(error='SSID fehlt oder Passwort zu kurz (min. 8 Zeichen)'), 400 cfg = load_cfg() cfg['ap_ssid'] = ssid cfg['ap_password'] = pw save_cfg(cfg) def _restart_ap(): if is_ap_active(): stop_ap() time.sleep(2) start_ap(ssid, pw) time.sleep(3) with wifi_lock: wifi_state.update(mode='ap', ssid=ssid, ip='10.42.0.1') threading.Thread(target=_restart_ap, daemon=True).start() return jsonify(ok=True) @app.route('/api/wifi/status') def r_wifi_status(): with wifi_lock: return jsonify(dict(wifi_state)) # ── HTML Template ───────────────────────────────────────────────────────────── HTML = r""" PiCopy

PiCopy

Verbinde…

Kopierstatus

Bereit

Verbundene USB-Geräte

Lade…

Kopier-Einstellungen

WiFi-Einstellungen

Heimnetz
Hotspot (AP)
WLAN für die Verbindung mit deinem Router. Wenn nicht erreichbar, startet PiCopy automatisch einen eigenen Hotspot.
Der Hotspot wird automatisch gestartet wenn kein Heimnetz erreichbar ist.
IP: 10.42.0.1  ·  Port: 8080

Protokoll

Noch keine Einträge
""" if __name__ == '__main__': load_state() threading.Thread(target=usb_monitor, daemon=True).start() threading.Thread(target=wifi_monitor, daemon=True).start() log.info('PiCopy v2 läuft auf http://0.0.0.0:8080') app.run(host='0.0.0.0', port=8080, debug=False, use_reloader=False)