168 lines
4.9 KiB
Python
168 lines
4.9 KiB
Python
"""PiCopy – WiFi: wifi_state, wifi_lock, nm(), Helpers, wifi_monitor."""
|
||
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
|
||
from picopy.config import (
|
||
NM_AP_CON, NM_CLIENT_CON, WIFI_BOOT_WAIT,
|
||
load_cfg, log
|
||
)
|
||
|
||
wifi_state = {
|
||
'mode': 'unknown', # 'client' | 'ap' | 'disconnected'
|
||
'ssid': '',
|
||
'ip': '',
|
||
}
|
||
wifi_lock = threading.Lock()
|
||
|
||
|
||
def nm(*args):
|
||
return subprocess.run(['nmcli'] + list(args),
|
||
capture_output=True, text=True, timeout=20)
|
||
|
||
|
||
def get_wlan0_info():
|
||
r = nm('-t', '-f', 'DEVICE,STATE,CONNECTION', 'dev')
|
||
for line in r.stdout.splitlines():
|
||
parts = line.split(':')
|
||
if parts and parts[0] == 'wlan0':
|
||
return {
|
||
'state': parts[1] if len(parts) > 1 else '',
|
||
'connection': ':'.join(parts[2:]) if len(parts) > 2 else '',
|
||
}
|
||
return {'state': '', 'connection': ''}
|
||
|
||
|
||
def get_wifi_ip():
|
||
r = nm('-t', '-f', 'IP4.ADDRESS', 'dev', 'show', 'wlan0')
|
||
for line in r.stdout.splitlines():
|
||
if 'IP4.ADDRESS' in line:
|
||
ip = line.split(':')[-1].split('/')[0].strip()
|
||
if ip:
|
||
return ip
|
||
return ''
|
||
|
||
|
||
def is_client_connected():
|
||
info = get_wlan0_info()
|
||
return (info['state'] == 'connected'
|
||
and info['connection']
|
||
and NM_AP_CON not in info['connection'])
|
||
|
||
|
||
def is_ap_active():
|
||
r = nm('-t', '-f', 'NAME,STATE', 'con', 'show', '--active')
|
||
return any(NM_AP_CON in l and 'activated' in l for l in r.stdout.splitlines())
|
||
|
||
|
||
def start_ap(ssid, password):
|
||
log.info(f'Starte AP: {ssid}')
|
||
nm('con', 'delete', NM_AP_CON)
|
||
time.sleep(1)
|
||
r = nm('dev', 'wifi', 'hotspot',
|
||
'ifname', 'wlan0',
|
||
'ssid', ssid,
|
||
'password', password,
|
||
'con-name', NM_AP_CON)
|
||
ok = r.returncode == 0
|
||
if ok:
|
||
log.info('AP gestartet')
|
||
else:
|
||
log.error(f'AP Fehler: {r.stderr}')
|
||
return ok
|
||
|
||
|
||
def stop_ap():
|
||
log.info('Stoppe AP')
|
||
nm('con', 'down', NM_AP_CON)
|
||
|
||
|
||
def connect_client_wifi(ssid, password):
|
||
log.info(f'Verbinde mit WiFi: {ssid}')
|
||
# Bestehende PiCopy-WiFi Verbindung löschen
|
||
nm('con', 'delete', NM_CLIENT_CON)
|
||
time.sleep(1)
|
||
r = nm('dev', 'wifi', 'connect', ssid,
|
||
'password', password,
|
||
'name', NM_CLIENT_CON,
|
||
'ifname', 'wlan0')
|
||
ok = r.returncode == 0
|
||
if ok:
|
||
log.info(f'Verbunden mit {ssid}')
|
||
else:
|
||
log.error(f'WiFi-Verbindung fehlgeschlagen: {r.stderr.strip()}')
|
||
return ok
|
||
|
||
|
||
def scan_wifi_networks():
|
||
nm('dev', 'wifi', 'rescan')
|
||
time.sleep(2)
|
||
r = nm('-t', '-f', 'SSID,SIGNAL,SECURITY', 'dev', 'wifi', 'list')
|
||
seen, nets = set(), []
|
||
for line in r.stdout.splitlines():
|
||
parts = line.split(':')
|
||
if len(parts) >= 2:
|
||
ssid = parts[0].strip()
|
||
signal = parts[1].strip() if len(parts) > 1 else '0'
|
||
security = ':'.join(parts[2:]).strip() if len(parts) > 2 else ''
|
||
if ssid and ssid not in seen:
|
||
seen.add(ssid)
|
||
nets.append({'ssid': ssid, 'signal': int(signal) if signal.isdigit() else 0, 'security': security})
|
||
return sorted(nets, key=lambda x: -x['signal'])
|
||
|
||
|
||
def update_wifi_state():
|
||
info = get_wlan0_info()
|
||
if info['state'] == 'connected':
|
||
if NM_AP_CON in info['connection']:
|
||
with wifi_lock:
|
||
wifi_state.update(mode='ap',
|
||
ssid=load_cfg().get('ap_ssid', 'PiCopy'),
|
||
ip='10.42.0.1')
|
||
else:
|
||
ip = get_wifi_ip()
|
||
with wifi_lock:
|
||
wifi_state.update(mode='client',
|
||
ssid=info['connection'],
|
||
ip=ip)
|
||
else:
|
||
with wifi_lock:
|
||
wifi_state.update(mode='disconnected', ssid='', ip='')
|
||
|
||
|
||
def wifi_monitor():
|
||
log.info(f'WiFi-Monitor: warte {WIFI_BOOT_WAIT}s auf Verbindung...')
|
||
time.sleep(WIFI_BOOT_WAIT)
|
||
|
||
while True:
|
||
try:
|
||
update_wifi_state()
|
||
with wifi_lock:
|
||
mode = wifi_state['mode']
|
||
|
||
if mode == 'disconnected':
|
||
cfg = load_cfg()
|
||
ssid = cfg.get('wifi_ssid', '')
|
||
pw = cfg.get('wifi_password', '')
|
||
|
||
connected = False
|
||
if ssid:
|
||
connected = connect_client_wifi(ssid, pw)
|
||
if connected:
|
||
time.sleep(5)
|
||
update_wifi_state()
|
||
|
||
if not connected:
|
||
ap_ssid = cfg.get('ap_ssid', 'PiCopy')
|
||
ap_pw = cfg.get('ap_password', 'PiCopy,')
|
||
if start_ap(ap_ssid, ap_pw):
|
||
time.sleep(3)
|
||
with wifi_lock:
|
||
wifi_state.update(mode='ap', ssid=ap_ssid, ip='10.42.0.1')
|
||
|
||
except Exception as e:
|
||
log.error(f'WiFi-Monitor Fehler: {e}')
|
||
|
||
time.sleep(30)
|