"""PiCopy – WiFi: wifi_state, wifi_lock, nm(), Helpers, wifi_monitor.""" import subprocess import threading import time from picopy.config import ( NM_AP_CON, NM_CLIENT_CON, load_cfg, log ) wifi_state = { 'mode': 'unknown', # 'client' | 'ap' | 'disconnected' 'ssid': '', 'ip': '', } wifi_lock = threading.Lock() def nm(*args): return subprocess.run(['nmcli'] + list(args), capture_output=True, text=True, timeout=20) def get_wlan0_info(): r = nm('-t', '-f', 'DEVICE,STATE,CONNECTION', 'dev') for line in r.stdout.splitlines(): parts = line.split(':') if parts and parts[0] == 'wlan0': return { 'state': parts[1] if len(parts) > 1 else '', 'connection': ':'.join(parts[2:]) if len(parts) > 2 else '', } return {'state': '', 'connection': ''} def get_wifi_ip(): r = nm('-t', '-f', 'IP4.ADDRESS', 'dev', 'show', 'wlan0') for line in r.stdout.splitlines(): if 'IP4.ADDRESS' in line: ip = line.split(':')[-1].split('/')[0].strip() if ip: return ip return '' def is_client_connected(): info = get_wlan0_info() return (info['state'] == 'connected' and info['connection'] and NM_AP_CON not in info['connection']) def is_ap_active(): r = nm('-t', '-f', 'NAME,STATE', 'con', 'show', '--active') return any(NM_AP_CON in l and 'activated' in l for l in r.stdout.splitlines()) def start_ap(ssid, password): if len(password) < 8: log.error(f'AP-Passwort zu kurz ({len(password)} Zeichen, min. 8 für WPA2)') return False log.info(f'Starte AP: {ssid}') nm('con', 'delete', NM_AP_CON) time.sleep(1) r = nm('dev', 'wifi', 'hotspot', 'ifname', 'wlan0', 'ssid', ssid, 'password', password, 'con-name', NM_AP_CON) ok = r.returncode == 0 if ok: log.info('AP gestartet') else: log.error(f'AP Fehler: {r.stderr}') return ok def stop_ap(): log.info('Stoppe AP') nm('con', 'down', NM_AP_CON) def _purge_foreign_wifi_profiles(): """Löscht alle WLAN-Profile die nicht von PiCopy verwaltet werden (z.B. Imager-Profile).""" r = nm('-t', '-f', 'NAME,TYPE', 'con', 'show') for line in r.stdout.splitlines(): parts = line.split(':', 1) if len(parts) == 2 and parts[1] == '802-11-wireless' and parts[0] not in (NM_CLIENT_CON, NM_AP_CON): nm('con', 'delete', parts[0]) log.info(f'Fremdprofil gelöscht: {parts[0]}') def connect_client_wifi(ssid, password): log.info(f'Verbinde mit WiFi: {ssid}') nm('con', 'delete', NM_CLIENT_CON) time.sleep(1) # Scan erzwingen – sonst findet nmcli das Netz nach dem Boot nicht im Cache log.info('WiFi-Scan läuft...') nm('dev', 'wifi', 'rescan') time.sleep(4) try: # --wait 45: genug Zeit für Scan + WPA-Handshake + DHCP r = subprocess.run( ['nmcli', '--wait', '45', 'dev', 'wifi', 'connect', ssid, 'password', password, 'name', NM_CLIENT_CON, 'ifname', 'wlan0'], capture_output=True, text=True, timeout=55, ) except subprocess.TimeoutExpired: log.warning(f'WiFi-Verbindung Timeout (SSID: {ssid})') return False ok = r.returncode == 0 if ok: log.info(f'Verbunden mit {ssid}') else: log.error(f'WiFi-Verbindung fehlgeschlagen: {r.stderr.strip()}') return ok def scan_wifi_networks(): nm('dev', 'wifi', 'rescan') time.sleep(2) r = nm('-t', '-f', 'SSID,SIGNAL,SECURITY', 'dev', 'wifi', 'list') seen, nets = set(), [] for line in r.stdout.splitlines(): parts = line.split(':') if len(parts) >= 2: ssid = parts[0].strip() signal = parts[1].strip() if len(parts) > 1 else '0' security = ':'.join(parts[2:]).strip() if len(parts) > 2 else '' if ssid and ssid not in seen: seen.add(ssid) nets.append({'ssid': ssid, 'signal': int(signal) if signal.isdigit() else 0, 'security': security}) return sorted(nets, key=lambda x: -x['signal']) def update_wifi_state(): info = get_wlan0_info() if info['state'] == 'connected': if NM_AP_CON in info['connection']: with wifi_lock: wifi_state.update(mode='ap', ssid=load_cfg().get('ap_ssid', 'PiCopy'), ip='10.42.0.1') else: ip = get_wifi_ip() with wifi_lock: wifi_state.update(mode='client', ssid=info['connection'], ip=ip) else: with wifi_lock: wifi_state.update(mode='disconnected', ssid='', ip='') def _wait_for_nm(max_wait: int = 30) -> None: """Wartet bis NetworkManager bereit ist, statt eine feste Zeit zu schlafen.""" for i in range(max_wait): r = nm('general', 'status') if r.returncode == 0: if i > 0: log.info(f'NetworkManager bereit nach {i}s') return time.sleep(1) log.warning('NetworkManager nach 30s noch nicht bereit') def wifi_monitor(): log.info('WiFi-Monitor: warte auf NetworkManager...') _wait_for_nm() _purge_foreign_wifi_profiles() connect_failures = 0 while True: try: update_wifi_state() with wifi_lock: mode = wifi_state['mode'] cfg = load_cfg() ssid = cfg.get('wifi_ssid', '') pw = cfg.get('wifi_password', '') if mode == 'client': connect_failures = 0 # Verbindung OK – Zähler zurücksetzen elif mode == 'disconnected': connected = False if ssid: log.info(f'Verbindungsversuch {connect_failures + 1}: {ssid}') connected = connect_client_wifi(ssid, pw) if connected: connect_failures = 0 time.sleep(5) update_wifi_state() else: connect_failures += 1 # AP erst starten wenn kein SSID konfiguriert ODER nach 2 Fehlversuchen if not connected and (not ssid or connect_failures >= 2): ap_ssid = cfg.get('ap_ssid', 'PiCopy') ap_pw = cfg.get('ap_password', 'PiCopy123') log.info(f'Starte Fallback-AP nach {connect_failures} Fehlversuchen') if start_ap(ap_ssid, ap_pw): connect_failures = 0 time.sleep(3) with wifi_lock: wifi_state.update(mode='ap', ssid=ap_ssid, ip='10.42.0.1') except Exception as e: log.error(f'WiFi-Monitor Fehler: {e}') time.sleep(30)