"""PiCopy – NAS-Upload (rclone): upload_state, upload_lock, alle rclone-Helpers, run_uploads.""" import json import posixpath import re import select import subprocess import threading import time from pathlib import Path from picopy.config import BASE_DIR, load_cfg, log from picopy.state import add_log RCLONE_CONF = BASE_DIR / 'rclone.conf' upload_state = { 'running': False, 'current': '', 'results': [], 'progress': 0, 'total': 0, 'done': 0, 'bytes_total': 0, 'bytes_done': 0, 'current_file': '', 'eta_sec': None, 'speed_bps': 0, } upload_lock = threading.Lock() def _rclone(*args, timeout=60): try: return subprocess.run( ['rclone', '--config', str(RCLONE_CONF)] + list(args), capture_output=True, text=True, timeout=timeout ) except subprocess.TimeoutExpired: return subprocess.CompletedProcess(args, 1, stdout='', stderr=f'Timeout nach {timeout}s') except Exception as e: return subprocess.CompletedProcess(args, 1, stdout='', stderr=str(e)) def _rclone_obscure(pw): r = subprocess.run(['rclone', 'obscure', pw], capture_output=True, text=True, timeout=10) return r.stdout.strip() def _parse_percent(text: str): m = re.search(r'(\d+(?:\.\d+)?)%', text) if not m: return None try: return max(0.0, min(100.0, float(m.group(1)))) except ValueError: return None def _rclone_copyto_progress(src: Path, dest: str, base_done: int, file_size: int, total_bytes: int, start_ts: float, timeout: int = 7200): args = [ 'rclone', '--config', str(RCLONE_CONF), 'copyto', str(src), dest, '--retries', '1', '--progress', '--stats', '1s', '--stats-one-line', ] try: p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1) started = time.time() stderr_parts = [] buf = '' while True: if p.poll() is not None: break if time.time() - started > timeout: p.kill() return subprocess.CompletedProcess(args, 1, stdout='', stderr=f'Timeout nach {timeout}s') ready, _, _ = select.select([p.stderr], [], [], 0.2) if p.stderr else ([], [], []) if not ready: time.sleep(0.1) continue chunk = p.stderr.read(1) if not chunk: continue stderr_parts.append(chunk) if chunk not in ('\r', '\n'): buf += chunk continue pct = _parse_percent(buf) buf = '' if pct is not None: transferred = int(file_size * pct / 100) bytes_done = base_done + transferred elapsed = time.time() - start_ts speed = bytes_done / elapsed if elapsed > 1 else 0 eta = int((total_bytes - bytes_done) / speed) if speed > 0 and total_bytes > bytes_done else 0 with upload_lock: upload_state.update(bytes_done=bytes_done, progress=int(bytes_done / total_bytes * 100) if total_bytes else 100, speed_bps=int(speed), eta_sec=eta) stdout, stderr_tail = p.communicate(timeout=5) if stderr_tail: stderr_parts.append(stderr_tail) return subprocess.CompletedProcess(args, p.returncode, stdout=stdout or '', stderr=''.join(stderr_parts)) except subprocess.TimeoutExpired: return subprocess.CompletedProcess(args, 1, stdout='', stderr=f'Timeout nach {timeout}s') except Exception as e: return subprocess.CompletedProcess(args, 1, stdout='', stderr=str(e)) def _remote_name(tid): return f'picopy_{tid}' def _join_remote_path(*parts) -> str: return '/'.join(str(p).strip('/') for p in parts if str(p).strip('/')) def _remote_exists(remote_path: str) -> bool: return _remote_size(remote_path) is not None def _remote_size(remote_path: str): r = _rclone('lsjson', remote_path, timeout=20) if r.returncode != 0: return None try: data = json.loads(r.stdout or '[]') if isinstance(data, dict): return data.get('Size') if isinstance(data, list) and data: item = data[0] return item.get('Size') if isinstance(item, dict) else None return None except (json.JSONDecodeError, ValueError): return None def _remote_unique_rel_path(t: dict, rel_path: str) -> str: if not _remote_exists(_smb_conn(t, rel_path)): return rel_path parent = posixpath.dirname(rel_path) name = posixpath.basename(rel_path) stem, suffix = posixpath.splitext(name) i = 1 while True: candidate_name = f'{stem}_({i}){suffix}' candidate = _join_remote_path(parent, candidate_name) if not _remote_exists(_smb_conn(t, candidate)): return candidate i += 1 def _smb_conn(t: dict, path: str = '') -> str: """Baut ein rclone-Ziel fuer gespeicherte SMB-Targets. Bei rclone SMB ist die Freigabe der erste Pfadteil nach dem Remote: remote:share/ordner. Die Remote-Konfiguration enthaelt Host und Login. """ share = t.get('smb_share', '') remote_path = _join_remote_path(share, path) if t.get('id'): return f'{_remote_name(t["id"])}:{remote_path}' host = t.get('smb_host', '') if not host: return f':{remote_path}' conn = f':smb,host={host}' if t.get('smb_user'): conn += f',user={t["smb_user"]}' if t.get('smb_pass'): conn += f',pass={t["smb_pass"]}' conn += f':{remote_path}' return conn def configure_smb_remote(tid, host, share, user, pw): rn = _remote_name(tid) _rclone('config', 'delete', rn) args = ['config', 'create', rn, 'smb', f'host={host}'] if user: args += [f'user={user}'] if pw: args += [f'pass={_rclone_obscure(pw)}'] r = _rclone(*args) return r.returncode == 0, r.stderr.strip() def delete_remote(tid): _rclone('config', 'delete', _remote_name(tid)) def test_remote(tid): cfg = load_cfg() targets = cfg.get('upload_targets', []) t = next((x for x in targets if x['id'] == tid), {'id': tid}) dest_root = t.get('dest_path', 'PiCopy').strip('/') root = _smb_conn(t) dest = _smb_conn(t, dest_root) test_dir_name = '.picopy_writetest' test_dir = _smb_conn(t, f'{dest_root}/{test_dir_name}' if dest_root else test_dir_name) # 1. Verbindung prüfen r = _rclone('lsd', root, timeout=15) if r.returncode != 0: err = r.stderr.strip().splitlines()[-1] if r.stderr.strip() else 'Verbindung fehlgeschlagen' return False, f'Verbindung: {err}' # 2. Zielordner und Schreibzugriff prüfen: Ziel anlegen, Testverzeichnis anlegen + sofort löschen mk = _rclone('mkdir', dest, timeout=15) if mk.returncode != 0: err = mk.stderr.strip().splitlines()[-1] if mk.stderr.strip() else 'Zielordner konnte nicht angelegt werden' return False, f'Zielordner: {err}' rw = _rclone('mkdir', test_dir, timeout=15) if rw.returncode != 0: err = rw.stderr.strip().splitlines()[-1] if rw.stderr.strip() else 'Schreiben fehlgeschlagen' return False, f'Kein Schreibzugriff: {err}' _rclone('rmdir', test_dir, timeout=10) return True, '' def run_uploads(local_dir: Path, cfg: dict, upload_files=None): """Lädt die zuletzt lokal geschriebenen Dateien zu allen aktiven Fernzielen hoch.""" # Frische Config laden damit zwischenzeitliche Änderungen (z.B. Deaktivierung) berücksichtigt werden current_cfg = load_cfg() targets = [t for t in current_cfg.get('upload_targets', []) if t.get('enabled')] if not targets: return with upload_lock: upload_state.update(running=True, results=[], current='', progress=0, total=0, done=0, bytes_total=0, bytes_done=0, current_file='', eta_sec=None, speed_bps=0) for t in targets: name = t.get('name', t['id']) with upload_lock: upload_state.update(current=name, progress=0, total=0, done=0, bytes_total=0, bytes_done=0, current_file='', eta_sec=None, speed_bps=0) add_log(f'Upload >> {name}...') dest_root = t.get('dest_path', 'PiCopy').strip('/') root = _smb_conn(t) # local_dir ist der lokal erzeugte Datumsordner. Auf dem NAS soll die # gleiche Struktur entstehen wie auf dem Ziellaufwerk: Ziel/Datum/... dest_rel = _join_remote_path(dest_root, local_dir.name) dest = _smb_conn(t, dest_rel) share = t.get('smb_share', '') dest_label = _join_remote_path(share, dest_rel) or '/' add_log(f'Upload {name}: Ziel {dest_label}') # Quellverzeichnis prüfen if not local_dir.exists(): err = f'Quellverzeichnis nicht gefunden: {local_dir}' add_log(f'Upload {name}: ✗ {err}') with upload_lock: upload_state['results'].append({'name': name, 'ok': False, 'msg': err}) continue # 1. Verbindung prüfen conn = _rclone('lsd', root, timeout=15) add_log(f'Upload {name}: Verbindung rc={conn.returncode}') if conn.returncode != 0: err = (conn.stderr.strip().splitlines()[-1] if conn.stderr.strip() else 'NAS nicht erreichbar') add_log(f'Upload {name}: ✗ {err}') with upload_lock: upload_state['results'].append({'name': name, 'ok': False, 'msg': err}) continue # 2. Zielordner anlegen mk = _rclone('mkdir', dest, timeout=30) add_log(f'Upload {name}: mkdir rc={mk.returncode}' + (f' err={mk.stderr.strip()[:100]}' if mk.returncode != 0 else '')) # 3. Kopieren mit Fortschritt add_log(f'Upload {name}: starte copy von {local_dir}') dup_mode = cfg.get('duplicate_handling', 'skip') if upload_files is None: files = sorted(f for f in local_dir.rglob('*') if f.is_file()) else: files = [] for f in upload_files: f = Path(f) try: f.relative_to(local_dir) except ValueError: continue if f.is_file(): files.append(f) files = sorted(files) dirs = sorted({p for f in files for p in f.relative_to(local_dir).parents if str(p) != '.'}) bytes_total = sum(f.stat().st_size for f in files) with upload_lock: upload_state.update(total=len(files), bytes_total=bytes_total, progress=100 if not files else 0) for d in dirs: _rclone('mkdir', _smb_conn(t, _join_remote_path(dest_rel, d.as_posix())), timeout=30) errors = [] skipped = 0 start_ts = time.time() for idx, f in enumerate(files, start=1): rel = f.relative_to(local_dir).as_posix() fsize = f.stat().st_size remote_rel = _join_remote_path(dest_rel, rel) with upload_lock: upload_state.update(done=idx, current_file=rel, progress=int(idx / len(files) * 100) if files else 100) if dup_mode == 'skip': remote_size = _remote_size(_smb_conn(t, remote_rel)) if remote_size == fsize: skipped += 1 with upload_lock: bd = upload_state['bytes_done'] + fsize elapsed = time.time() - start_ts speed = bd / elapsed if elapsed > 1 else 0 eta = int((bytes_total - bd) / speed) if speed > 0 and bytes_total > bd else 0 upload_state.update(bytes_done=bd, progress=int(bd / bytes_total * 100) if bytes_total else 100, speed_bps=int(speed), eta_sec=eta) continue elif dup_mode == 'rename': remote_rel = _remote_unique_rel_path(t, remote_rel) with upload_lock: base_done = upload_state['bytes_done'] rr = _rclone_copyto_progress(f, _smb_conn(t, remote_rel), base_done, fsize, bytes_total, start_ts) if rr.returncode != 0: errors.append(rr.stderr.strip() or f'{rel}: unbekannter Fehler') if len(errors) >= 5: break with upload_lock: bd = base_done + fsize elapsed = time.time() - start_ts speed = bd / elapsed if elapsed > 1 else 0 eta = int((bytes_total - bd) / speed) if speed > 0 and bytes_total > bd else 0 upload_state.update(bytes_done=bd, progress=int(bd / bytes_total * 100) if bytes_total else 100, speed_bps=int(speed), eta_sec=eta) r = subprocess.CompletedProcess( args=['rclone', 'copyto'], returncode=1 if errors else 0, stdout='', stderr='\n'.join(errors), ) ok = r.returncode == 0 err = '' if not ok: err = r.stderr.strip() or 'Unbekannter Fehler' add_log(f'Upload {name}: rclone stderr: {err[:300]}') elif skipped: add_log(f'Upload {name}: {skipped} Dateien übersprungen') with upload_lock: upload_state['results'].append({'name': name, 'ok': ok, 'msg': err}) add_log(f'Upload {name}: {"✓ OK" if ok else "✗ Fehler - " + err}') with upload_lock: upload_state['running'] = False upload_state['current'] = '' upload_state['current_file'] = ''