feat: lokale Routing-Regeln für WireGuard hinzugefügt und Versionsnummer auf 1.0.31 erhöht

This commit is contained in:
2026-05-09 12:39:11 +02:00
parent 5dfe632a81
commit 0c14da3435
2 changed files with 73 additions and 58 deletions

129
app.py
View File

@@ -327,61 +327,32 @@ def wg_update_state():
error=None, has_config=has_conf) error=None, has_config=has_conf)
import ipaddress as _ipaddress # Helper-Script das wg-quick via PostUp/PreDown aufruft
_WG_ROUTE_SCRIPT = BASE_DIR / 'wg_route.py'
_WG_ROUTE_PY = '''\
#!/usr/bin/env python3
"""PiCopy: lokale IPs bei aktivem WireGuard-VPN erreichbar halten."""
import sys, subprocess, ipaddress
_wg_managed_subnets: list = [] # beim Connect gemerkte Subnetze für sauberes Cleanup def local_nets():
r = subprocess.run(["ip", "-4", "addr", "show"], capture_output=True, text=True)
seen, res, iface = set(), [], ""
for ln in r.stdout.splitlines():
if not ln[:1].isspace():
iface = ln.split(":")[1].strip().split("@")[0] if ":" in ln else ""
elif iface and iface != "lo" and ln.strip().startswith("inet "):
try:
net = str(ipaddress.IPv4Interface(ln.split()[1]).network)
if net not in seen: seen.add(net); res.append(net)
except Exception: pass
return res
up = len(sys.argv) < 2 or sys.argv[1] != "down"
def _local_subnets_before_vpn() -> list: for i, net in enumerate(local_nets()):
"""Subnetze aller lokalen Interfaces (außer Loopback) via 'ip addr show'. cmd = "add" if up else "del"
Muss VOR wg-quick up aufgerufen werden, damit wg-quick die Tabelle noch subprocess.run(["ip","rule",cmd,"from",net,"table","main","priority",str(100+i*2)], capture_output=True)
nicht verändert hat.""" subprocess.run(["ip","rule",cmd,"to", net,"table","main","priority",str(101+i*2)], capture_output=True)
r = subprocess.run(['ip', '-4', 'addr', 'show'], capture_output=True, text=True) '''
seen, result = set(), []
current_iface = ''
for line in r.stdout.splitlines():
if not line[:1].isspace():
# Zeile wie "2: wlan0: <BROADCAST,..."
current_iface = line.split(':')[1].strip().split('@')[0] if ':' in line else ''
elif current_iface and current_iface != 'lo':
s = line.strip()
if s.startswith('inet '):
try:
net = str(_ipaddress.IPv4Interface(s.split()[1]).network)
if net not in seen:
seen.add(net)
result.append(net)
except Exception:
pass
return result
def _wg_add_local_routes(subnets: list):
"""Alle lokalen Subnetze (Hotspot, WLAN, LAN) vom VPN-Tunnel ausschließen."""
global _wg_managed_subnets
_wg_managed_subnets = subnets
for i, subnet in enumerate(subnets):
prio_from = 100 + i * 2
prio_to = 101 + i * 2
subprocess.run(['ip', 'rule', 'add', 'from', subnet,
'table', 'main', 'priority', str(prio_from)], capture_output=True)
subprocess.run(['ip', 'rule', 'add', 'to', subnet,
'table', 'main', 'priority', str(prio_to)], capture_output=True)
log.info(f'Lokale Routing-Regeln gesetzt: {subnets}')
def _wg_remove_local_routes():
"""Routing-Regeln wieder entfernen."""
global _wg_managed_subnets
for i, subnet in enumerate(_wg_managed_subnets):
prio_from = 100 + i * 2
prio_to = 101 + i * 2
subprocess.run(['ip', 'rule', 'del', 'from', subnet,
'table', 'main', 'priority', str(prio_from)], capture_output=True)
subprocess.run(['ip', 'rule', 'del', 'to', subnet,
'table', 'main', 'priority', str(prio_to)], capture_output=True)
log.info(f'Lokale Routing-Regeln entfernt: {_wg_managed_subnets}')
_wg_managed_subnets = []
def wg_connect(): def wg_connect():
@@ -389,17 +360,15 @@ def wg_connect():
with wg_lock: with wg_lock:
wg_state['error'] = 'Keine Konfiguration vorhanden' wg_state['error'] = 'Keine Konfiguration vorhanden'
return False return False
local_nets = _local_subnets_before_vpn() # VOR wg-quick lesen _wg_inject_routing(WG_CONF.read_text(encoding='utf-8')) # PostUp/PreDown sicherstellen
r = subprocess.run(['wg-quick', 'up', WG_IFACE], r = subprocess.run(['wg-quick', 'up', WG_IFACE],
capture_output=True, text=True, timeout=30) capture_output=True, text=True, timeout=30)
if r.returncode == 0: if r.returncode == 0:
time.sleep(1) time.sleep(1)
_wg_add_local_routes(local_nets)
wg_update_state() wg_update_state()
log.info('WireGuard verbunden') log.info('WireGuard verbunden')
return True return True
lines = r.stderr.strip().splitlines() if r.stderr.strip() else [] lines = r.stderr.strip().splitlines() if r.stderr.strip() else []
# [#]-Zeilen sind wg-quick Shell-Traces, keine echten Fehlermeldungen
real_errors = [l for l in lines if not l.strip().startswith('[#]')] real_errors = [l for l in lines if not l.strip().startswith('[#]')]
err = (real_errors[-1] if real_errors else lines[-1] if lines else 'Unbekannter Fehler') err = (real_errors[-1] if real_errors else lines[-1] if lines else 'Unbekannter Fehler')
if 'resolvconf' in err and 'not found' in err: if 'resolvconf' in err and 'not found' in err:
@@ -411,7 +380,6 @@ def wg_connect():
def wg_disconnect(): def wg_disconnect():
_wg_remove_local_routes()
r = subprocess.run(['wg-quick', 'down', WG_IFACE], r = subprocess.run(['wg-quick', 'down', WG_IFACE],
capture_output=True, text=True, timeout=15) capture_output=True, text=True, timeout=15)
with wg_lock: with wg_lock:
@@ -457,9 +425,56 @@ def wg_uninstall():
_wg_apt('remove', ['wireguard', 'wireguard-tools']) _wg_apt('remove', ['wireguard', 'wireguard-tools'])
def _wg_inject_routing(content: str) -> str:
"""Schreibt den Helper-Script und injiziert PostUp/PreDown in die Config,
falls 0.0.0.0/0 enthalten ist (Full-Tunnel). Gibt den ggf. geänderten
Config-Text zurück und schreibt ihn direkt in WG_CONF."""
if '0.0.0.0/0' not in content:
return content
try:
_WG_ROUTE_SCRIPT.write_text(_WG_ROUTE_PY, encoding='utf-8')
_WG_ROUTE_SCRIPT.chmod(0o755)
except Exception as e:
log.warning(f'wg_route.py konnte nicht geschrieben werden: {e}')
return content
post = f'PostUp = python3 {_WG_ROUTE_SCRIPT} up'
pred = f'PreDown = python3 {_WG_ROUTE_SCRIPT} down'
tag = str(_WG_ROUTE_SCRIPT)
# Bereits vorhanden → nichts tun
if tag in content:
return content
lines, new_lines, in_iface, done = content.splitlines(), [], False, False
for line in lines:
s = line.strip().lower()
# Alte PostUp/PreDown von uns entfernen (falls Pfad geändert)
if (s.startswith('postup') or s.startswith('predown')) and 'wg_route' in line:
continue
if line.strip() == '[Interface]':
in_iface = True
elif in_iface and line.strip().startswith('[') and not done:
new_lines += [post, pred, '']
in_iface = False
done = True
new_lines.append(line)
if in_iface and not done:
new_lines += [post, pred]
new_content = '\n'.join(new_lines)
try:
WG_CONF.write_text(new_content, encoding='utf-8')
WG_CONF.chmod(0o600)
except Exception as e:
log.warning(f'WG_CONF konnte nicht aktualisiert werden: {e}')
return new_content
def wg_save_config(content: str): def wg_save_config(content: str):
try: try:
WG_CONF.parent.mkdir(parents=True, exist_ok=True) WG_CONF.parent.mkdir(parents=True, exist_ok=True)
content = _wg_inject_routing(content)
WG_CONF.write_text(content, encoding='utf-8') WG_CONF.write_text(content, encoding='utf-8')
WG_CONF.chmod(0o600) WG_CONF.chmod(0o600)
return True, '' return True, ''

View File

@@ -1 +1 @@
1.0.30 1.0.31