"""PiCopy – WiFi: wifi_state, wifi_lock, nm(), Helpers, wifi_monitor.""" import subprocess import threading import time from picopy.config import ( NM_AP_CON, NM_CLIENT_CON, load_cfg, log ) wifi_state = { 'mode': 'unknown', # 'client' | 'ap' | 'disconnected' 'ssid': '', 'ip': '', } wifi_lock = threading.Lock() def nm(*args): return subprocess.run(['nmcli'] + list(args), capture_output=True, text=True, timeout=20) def get_wlan0_info(): r = nm('-t', '-f', 'DEVICE,STATE,CONNECTION', 'dev') for line in r.stdout.splitlines(): parts = line.split(':') if parts and parts[0] == 'wlan0': return { 'state': parts[1] if len(parts) > 1 else '', 'connection': ':'.join(parts[2:]) if len(parts) > 2 else '', } return {'state': '', 'connection': ''} def get_wifi_ip(): r = nm('-t', '-f', 'IP4.ADDRESS', 'dev', 'show', 'wlan0') for line in r.stdout.splitlines(): if 'IP4.ADDRESS' in line: ip = line.split(':')[-1].split('/')[0].strip() if ip: return ip return '' def is_client_connected(): info = get_wlan0_info() return (info['state'] == 'connected' and info['connection'] and NM_AP_CON not in info['connection']) def is_ap_active(): r = nm('-t', '-f', 'NAME,STATE', 'con', 'show', '--active') return any(NM_AP_CON in l and 'activated' in l for l in r.stdout.splitlines()) def start_ap(ssid, password): if len(password) < 8: log.error(f'AP-Passwort zu kurz ({len(password)} Zeichen, min. 8 für WPA2)') return False log.info(f'Starte AP: {ssid}') nm('con', 'delete', NM_AP_CON) time.sleep(1) r = nm('dev', 'wifi', 'hotspot', 'ifname', 'wlan0', 'ssid', ssid, 'password', password, 'con-name', NM_AP_CON) ok = r.returncode == 0 if ok: log.info('AP gestartet') else: log.error(f'AP Fehler: {r.stderr}') return ok def stop_ap(): log.info('Stoppe AP') nm('con', 'down', NM_AP_CON) def _takeover_wifi_autoconnect(): """Deaktiviert Autoconnect aller Fremd-WLAN-Profile, damit NM beim Boot PiCopy-WiFi bevorzugt.""" r = nm('-t', '-f', 'NAME,TYPE', 'con', 'show') for line in r.stdout.splitlines(): parts = line.split(':', 1) if len(parts) == 2 and parts[1] == '802-11-wireless' and parts[0] not in (NM_CLIENT_CON, NM_AP_CON): nm('con', 'mod', parts[0], 'connection.autoconnect', 'no') log.info(f'Autoconnect für Fremdprofil deaktiviert: {parts[0]}') nm('con', 'mod', NM_CLIENT_CON, 'connection.autoconnect', 'yes') def connect_client_wifi(ssid, password): log.info(f'Verbinde mit WiFi: {ssid}') nm('con', 'delete', NM_CLIENT_CON) time.sleep(1) try: # --wait 15: nmcli gibt nach 15 s auf; subprocess-Timeout als Sicherheitsnetz r = subprocess.run( ['nmcli', '--wait', '15', 'dev', 'wifi', 'connect', ssid, 'password', password, 'name', NM_CLIENT_CON, 'ifname', 'wlan0'], capture_output=True, text=True, timeout=25, ) except subprocess.TimeoutExpired: log.warning(f'WiFi-Verbindung Timeout (SSID: {ssid})') return False ok = r.returncode == 0 if ok: log.info(f'Verbunden mit {ssid}') _takeover_wifi_autoconnect() else: log.error(f'WiFi-Verbindung fehlgeschlagen: {r.stderr.strip()}') return ok def scan_wifi_networks(): nm('dev', 'wifi', 'rescan') time.sleep(2) r = nm('-t', '-f', 'SSID,SIGNAL,SECURITY', 'dev', 'wifi', 'list') seen, nets = set(), [] for line in r.stdout.splitlines(): parts = line.split(':') if len(parts) >= 2: ssid = parts[0].strip() signal = parts[1].strip() if len(parts) > 1 else '0' security = ':'.join(parts[2:]).strip() if len(parts) > 2 else '' if ssid and ssid not in seen: seen.add(ssid) nets.append({'ssid': ssid, 'signal': int(signal) if signal.isdigit() else 0, 'security': security}) return sorted(nets, key=lambda x: -x['signal']) def update_wifi_state(): info = get_wlan0_info() if info['state'] == 'connected': if NM_AP_CON in info['connection']: with wifi_lock: wifi_state.update(mode='ap', ssid=load_cfg().get('ap_ssid', 'PiCopy'), ip='10.42.0.1') else: ip = get_wifi_ip() with wifi_lock: wifi_state.update(mode='client', ssid=info['connection'], ip=ip) else: with wifi_lock: wifi_state.update(mode='disconnected', ssid='', ip='') def _wait_for_nm(max_wait: int = 30) -> None: """Wartet bis NetworkManager bereit ist, statt eine feste Zeit zu schlafen.""" for i in range(max_wait): r = nm('general', 'status') if r.returncode == 0: if i > 0: log.info(f'NetworkManager bereit nach {i}s') return time.sleep(1) log.warning('NetworkManager nach 30s noch nicht bereit') def wifi_monitor(): log.info('WiFi-Monitor: warte auf NetworkManager...') _wait_for_nm() first_run = True while True: try: update_wifi_state() with wifi_lock: mode = wifi_state['mode'] current_conn = wifi_state.get('ssid', '') cfg = load_cfg() picopy_ssid = cfg.get('wifi_ssid', '') # Beim ersten Durchlauf: falls ein Fremdprofil (Imager) verbunden ist # und PiCopy eine eigene SSID kennt → zu PiCopy-WiFi wechseln if first_run and mode == 'client' and current_conn != NM_CLIENT_CON and picopy_ssid: log.info(f'Fremdprofil aktiv ({current_conn}), wechsle zu PiCopy-WiFi ({picopy_ssid})...') connected = connect_client_wifi(picopy_ssid, cfg.get('wifi_password', '')) if connected: time.sleep(5) update_wifi_state() with wifi_lock: mode = wifi_state['mode'] first_run = False if mode == 'disconnected': ssid = picopy_ssid pw = cfg.get('wifi_password', '') connected = False if ssid: connected = connect_client_wifi(ssid, pw) if connected: time.sleep(5) update_wifi_state() if not connected: ap_ssid = cfg.get('ap_ssid', 'PiCopy') ap_pw = cfg.get('ap_password', 'PiCopy123') if start_ap(ap_ssid, ap_pw): time.sleep(3) with wifi_lock: wifi_state.update(mode='ap', ssid=ap_ssid, ip='10.42.0.1') except Exception as e: log.error(f'WiFi-Monitor Fehler: {e}') time.sleep(30)