"""PiCopy – WireGuard VPN: wg_state, wg_lock, alle wg_* Funktionen, wg_monitor.""" import os import re import shutil import subprocess import threading import time from pathlib import Path from picopy.config import log WG_CONF = Path('/etc/wireguard/picopy.conf') WG_IFACE = 'picopy' def wg_is_installed(): return shutil.which('wg-quick') is not None wg_state = { 'connected': False, 'ip': '', 'peer': '', 'error': None, 'has_config': False, 'installed': False, 'pkg_running': False, 'pkg_action': '', 'pkg_error': None, } wg_lock = threading.Lock() def wg_update_state(): inst = wg_is_installed() has_conf = WG_CONF.exists() if not inst: with wg_lock: wg_state.update(installed=False, connected=False, ip='', peer='', has_config=has_conf) return r = subprocess.run(['wg', 'show', WG_IFACE], capture_output=True, text=True, timeout=5) if r.returncode != 0: with wg_lock: wg_state.update(installed=True, connected=False, ip='', peer='', has_config=has_conf) return ip_r = subprocess.run(['ip', '-4', 'addr', 'show', WG_IFACE], capture_output=True, text=True, timeout=5) ip = '' for line in ip_r.stdout.splitlines(): if line.strip().startswith('inet '): ip = line.strip().split()[1].split('/')[0] break peer = '' for line in r.stdout.splitlines(): if line.startswith('peer:'): peer = line.split(':', 1)[-1].strip() break with wg_lock: wg_state.update(installed=True, connected=True, ip=ip, peer=peer, error=None, has_config=has_conf) def wg_connect(): if not WG_CONF.exists(): with wg_lock: wg_state['error'] = 'Keine Konfiguration vorhanden' return False r = subprocess.run(['wg-quick', 'up', WG_IFACE], capture_output=True, text=True, timeout=30) if r.returncode == 0: time.sleep(1) wg_update_state() log.info('WireGuard verbunden') return True lines = r.stderr.strip().splitlines() if r.stderr.strip() else [] real_errors = [l for l in lines if not l.strip().startswith('[#]')] err = (real_errors[-1] if real_errors else lines[-1] if lines else 'Unbekannter Fehler') if 'resolvconf' in err and 'not found' in err: err = 'resolvconf fehlt - bitte WireGuard deinstallieren und neu installieren (openresolv wird dann mitinstalliert)' with wg_lock: wg_state.update(connected=False, error=err) log.error(f'WireGuard Fehler: {err}') return False def wg_disconnect(): r = subprocess.run(['wg-quick', 'down', WG_IFACE], capture_output=True, text=True, timeout=15) with wg_lock: wg_state.update(connected=False, ip='', peer='', error=None) log.info('WireGuard getrennt') return r.returncode == 0 def _wg_apt(action: str, packages: list): """Führt apt-get install/remove aus und aktualisiert pkg_state.""" with wg_lock: if wg_state['pkg_running']: return wg_state.update(pkg_running=True, pkg_action=action, pkg_error=None) try: cmd = ['apt-get', action, '-y'] + packages r = subprocess.run(cmd, capture_output=True, text=True, timeout=300, env={**os.environ, 'DEBIAN_FRONTEND': 'noninteractive'}) if r.returncode != 0: err = (r.stderr.strip().splitlines()[-1] if r.stderr.strip() else f'apt-get {action} fehlgeschlagen') log.error(f'WireGuard apt {action}: {err}') with wg_lock: wg_state['pkg_error'] = err else: log.info(f'WireGuard apt {action} abgeschlossen') except Exception as e: with wg_lock: wg_state['pkg_error'] = str(e) finally: with wg_lock: wg_state['pkg_running'] = False wg_state['pkg_action'] = '' wg_update_state() def wg_install(): _wg_apt('install', ['wireguard', 'wireguard-tools', 'openresolv']) def wg_uninstall(): wg_disconnect() _wg_apt('remove', ['wireguard', 'wireguard-tools']) def wg_save_config(content: str): try: WG_CONF.parent.mkdir(parents=True, exist_ok=True) WG_CONF.write_text(content, encoding='utf-8') WG_CONF.chmod(0o600) return True, '' except Exception as e: return False, str(e) def wg_monitor(): while True: try: wg_update_state() except Exception: pass time.sleep(10)