153 lines
4.6 KiB
Python
153 lines
4.6 KiB
Python
"""PiCopy – WireGuard VPN: wg_state, wg_lock, alle wg_* Funktionen, wg_monitor."""
|
||
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
from pathlib import Path
|
||
|
||
from picopy.config import log
|
||
|
||
WG_CONF = Path('/etc/wireguard/picopy.conf')
|
||
WG_IFACE = 'picopy'
|
||
|
||
|
||
def wg_is_installed():
|
||
return shutil.which('wg-quick') is not None
|
||
|
||
|
||
wg_state = {
|
||
'connected': False,
|
||
'ip': '',
|
||
'peer': '',
|
||
'error': None,
|
||
'has_config': False,
|
||
'installed': False,
|
||
'pkg_running': False,
|
||
'pkg_action': '',
|
||
'pkg_error': None,
|
||
}
|
||
wg_lock = threading.Lock()
|
||
|
||
|
||
def wg_update_state():
|
||
inst = wg_is_installed()
|
||
has_conf = WG_CONF.exists()
|
||
if not inst:
|
||
with wg_lock:
|
||
wg_state.update(installed=False, connected=False, ip='', peer='',
|
||
has_config=has_conf)
|
||
return
|
||
r = subprocess.run(['wg', 'show', WG_IFACE],
|
||
capture_output=True, text=True, timeout=5)
|
||
if r.returncode != 0:
|
||
with wg_lock:
|
||
wg_state.update(installed=True, connected=False, ip='', peer='',
|
||
has_config=has_conf)
|
||
return
|
||
ip_r = subprocess.run(['ip', '-4', 'addr', 'show', WG_IFACE],
|
||
capture_output=True, text=True, timeout=5)
|
||
ip = ''
|
||
for line in ip_r.stdout.splitlines():
|
||
if line.strip().startswith('inet '):
|
||
ip = line.strip().split()[1].split('/')[0]
|
||
break
|
||
peer = ''
|
||
for line in r.stdout.splitlines():
|
||
if line.startswith('peer:'):
|
||
peer = line.split(':', 1)[-1].strip()
|
||
break
|
||
with wg_lock:
|
||
wg_state.update(installed=True, connected=True, ip=ip, peer=peer,
|
||
error=None, has_config=has_conf)
|
||
|
||
|
||
def wg_connect():
|
||
if not WG_CONF.exists():
|
||
with wg_lock:
|
||
wg_state['error'] = 'Keine Konfiguration vorhanden'
|
||
return False
|
||
r = subprocess.run(['wg-quick', 'up', WG_IFACE],
|
||
capture_output=True, text=True, timeout=30)
|
||
if r.returncode == 0:
|
||
time.sleep(1)
|
||
wg_update_state()
|
||
log.info('WireGuard verbunden')
|
||
return True
|
||
lines = r.stderr.strip().splitlines() if r.stderr.strip() else []
|
||
real_errors = [l for l in lines if not l.strip().startswith('[#]')]
|
||
err = (real_errors[-1] if real_errors else lines[-1] if lines else 'Unbekannter Fehler')
|
||
if 'resolvconf' in err and 'not found' in err:
|
||
err = 'resolvconf fehlt - bitte WireGuard deinstallieren und neu installieren (openresolv wird dann mitinstalliert)'
|
||
with wg_lock:
|
||
wg_state.update(connected=False, error=err)
|
||
log.error(f'WireGuard Fehler: {err}')
|
||
return False
|
||
|
||
|
||
def wg_disconnect():
|
||
r = subprocess.run(['wg-quick', 'down', WG_IFACE],
|
||
capture_output=True, text=True, timeout=15)
|
||
with wg_lock:
|
||
wg_state.update(connected=False, ip='', peer='', error=None)
|
||
log.info('WireGuard getrennt')
|
||
return r.returncode == 0
|
||
|
||
|
||
def _wg_apt(action: str, packages: list):
|
||
"""Führt apt-get install/remove aus und aktualisiert pkg_state."""
|
||
with wg_lock:
|
||
if wg_state['pkg_running']:
|
||
return
|
||
wg_state.update(pkg_running=True, pkg_action=action, pkg_error=None)
|
||
try:
|
||
cmd = ['apt-get', action, '-y'] + packages
|
||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=300,
|
||
env={**os.environ, 'DEBIAN_FRONTEND': 'noninteractive'})
|
||
if r.returncode != 0:
|
||
err = (r.stderr.strip().splitlines()[-1]
|
||
if r.stderr.strip() else f'apt-get {action} fehlgeschlagen')
|
||
log.error(f'WireGuard apt {action}: {err}')
|
||
with wg_lock:
|
||
wg_state['pkg_error'] = err
|
||
else:
|
||
log.info(f'WireGuard apt {action} abgeschlossen')
|
||
except Exception as e:
|
||
with wg_lock:
|
||
wg_state['pkg_error'] = str(e)
|
||
finally:
|
||
with wg_lock:
|
||
wg_state['pkg_running'] = False
|
||
wg_state['pkg_action'] = ''
|
||
wg_update_state()
|
||
|
||
|
||
def wg_install():
|
||
_wg_apt('install', ['wireguard', 'wireguard-tools', 'openresolv'])
|
||
|
||
|
||
def wg_uninstall():
|
||
wg_disconnect()
|
||
_wg_apt('remove', ['wireguard', 'wireguard-tools'])
|
||
|
||
|
||
def wg_save_config(content: str):
|
||
try:
|
||
WG_CONF.parent.mkdir(parents=True, exist_ok=True)
|
||
WG_CONF.write_text(content, encoding='utf-8')
|
||
WG_CONF.chmod(0o600)
|
||
return True, ''
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def wg_monitor():
|
||
while True:
|
||
try:
|
||
wg_update_state()
|
||
except Exception:
|
||
pass
|
||
time.sleep(10)
|