Files
PiCopy/picopy/upload.py

380 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""PiCopy NAS-Upload (rclone): upload_state, upload_lock, alle rclone-Helpers, run_uploads."""
import json
import posixpath
import re
import select
import subprocess
import threading
import time
from pathlib import Path
from picopy.config import BASE_DIR, load_cfg, log
from picopy.state import add_log
RCLONE_CONF = BASE_DIR / 'rclone.conf'
upload_state = {
'running': False,
'current': '',
'results': [],
'progress': 0,
'total': 0,
'done': 0,
'bytes_total': 0,
'bytes_done': 0,
'current_file': '',
'eta_sec': None,
'speed_bps': 0,
}
upload_lock = threading.Lock()
def _rclone(*args, timeout=60):
try:
return subprocess.run(
['rclone', '--config', str(RCLONE_CONF)] + list(args),
capture_output=True, text=True, timeout=timeout
)
except subprocess.TimeoutExpired:
return subprocess.CompletedProcess(args, 1, stdout='', stderr=f'Timeout nach {timeout}s')
except Exception as e:
return subprocess.CompletedProcess(args, 1, stdout='', stderr=str(e))
def _rclone_obscure(pw):
r = subprocess.run(['rclone', 'obscure', pw],
capture_output=True, text=True, timeout=10)
return r.stdout.strip()
def _parse_percent(text: str):
m = re.search(r'(\d+(?:\.\d+)?)%', text)
if not m:
return None
try:
return max(0.0, min(100.0, float(m.group(1))))
except ValueError:
return None
def _rclone_copyto_progress(src: Path, dest: str, base_done: int,
file_size: int, total_bytes: int, start_ts: float,
timeout: int = 7200):
args = [
'rclone', '--config', str(RCLONE_CONF),
'copyto', str(src), dest,
'--retries', '1',
'--progress',
'--stats', '1s',
'--stats-one-line',
]
try:
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True, bufsize=1)
started = time.time()
stderr_parts = []
buf = ''
while True:
if p.poll() is not None:
break
if time.time() - started > timeout:
p.kill()
return subprocess.CompletedProcess(args, 1, stdout='', stderr=f'Timeout nach {timeout}s')
ready, _, _ = select.select([p.stderr], [], [], 0.2) if p.stderr else ([], [], [])
if not ready:
time.sleep(0.1)
continue
chunk = p.stderr.read(1)
if not chunk:
continue
stderr_parts.append(chunk)
if chunk not in ('\r', '\n'):
buf += chunk
continue
pct = _parse_percent(buf)
buf = ''
if pct is not None:
transferred = int(file_size * pct / 100)
bytes_done = base_done + transferred
elapsed = time.time() - start_ts
speed = bytes_done / elapsed if elapsed > 1 else 0
eta = int((total_bytes - bytes_done) / speed) if speed > 0 and total_bytes > bytes_done else 0
with upload_lock:
upload_state.update(bytes_done=bytes_done,
progress=int(bytes_done / total_bytes * 100) if total_bytes else 100,
speed_bps=int(speed), eta_sec=eta)
stdout, stderr_tail = p.communicate(timeout=5)
if stderr_tail:
stderr_parts.append(stderr_tail)
return subprocess.CompletedProcess(args, p.returncode, stdout=stdout or '',
stderr=''.join(stderr_parts))
except subprocess.TimeoutExpired:
return subprocess.CompletedProcess(args, 1, stdout='', stderr=f'Timeout nach {timeout}s')
except Exception as e:
return subprocess.CompletedProcess(args, 1, stdout='', stderr=str(e))
def _remote_name(tid):
return f'picopy_{tid}'
def _join_remote_path(*parts) -> str:
return '/'.join(str(p).strip('/') for p in parts if str(p).strip('/'))
def _remote_exists(remote_path: str) -> bool:
return _remote_size(remote_path) is not None
def _remote_size(remote_path: str):
r = _rclone('lsjson', remote_path, timeout=20)
if r.returncode != 0:
return None
try:
data = json.loads(r.stdout or '[]')
if isinstance(data, dict):
return data.get('Size')
if isinstance(data, list) and data:
item = data[0]
return item.get('Size') if isinstance(item, dict) else None
return None
except (json.JSONDecodeError, ValueError):
return None
def _remote_unique_rel_path(t: dict, rel_path: str) -> str:
if not _remote_exists(_smb_conn(t, rel_path)):
return rel_path
parent = posixpath.dirname(rel_path)
name = posixpath.basename(rel_path)
stem, suffix = posixpath.splitext(name)
i = 1
while True:
candidate_name = f'{stem}_({i}){suffix}'
candidate = _join_remote_path(parent, candidate_name)
if not _remote_exists(_smb_conn(t, candidate)):
return candidate
i += 1
def _smb_conn(t: dict, path: str = '') -> str:
"""Baut ein rclone-Ziel fuer gespeicherte SMB-Targets.
Bei rclone SMB ist die Freigabe der erste Pfadteil nach dem Remote:
remote:share/ordner. Die Remote-Konfiguration enthaelt Host und Login.
"""
share = t.get('smb_share', '')
remote_path = _join_remote_path(share, path)
if t.get('id'):
return f'{_remote_name(t["id"])}:{remote_path}'
host = t.get('smb_host', '')
if not host:
return f':{remote_path}'
conn = f':smb,host={host}'
if t.get('smb_user'):
conn += f',user={t["smb_user"]}'
if t.get('smb_pass'):
conn += f',pass={t["smb_pass"]}'
conn += f':{remote_path}'
return conn
def configure_smb_remote(tid, host, share, user, pw):
rn = _remote_name(tid)
_rclone('config', 'delete', rn)
args = ['config', 'create', rn, 'smb', f'host={host}']
if user:
args += [f'user={user}']
if pw:
args += [f'pass={_rclone_obscure(pw)}']
r = _rclone(*args)
return r.returncode == 0, r.stderr.strip()
def delete_remote(tid):
_rclone('config', 'delete', _remote_name(tid))
def test_remote(tid):
cfg = load_cfg()
targets = cfg.get('upload_targets', [])
t = next((x for x in targets if x['id'] == tid), {'id': tid})
dest_root = t.get('dest_path', 'PiCopy').strip('/')
root = _smb_conn(t)
dest = _smb_conn(t, dest_root)
test_dir_name = '.picopy_writetest'
test_dir = _smb_conn(t, f'{dest_root}/{test_dir_name}' if dest_root else test_dir_name)
# 1. Verbindung prüfen
r = _rclone('lsd', root, timeout=15)
if r.returncode != 0:
err = r.stderr.strip().splitlines()[-1] if r.stderr.strip() else 'Verbindung fehlgeschlagen'
return False, f'Verbindung: {err}'
# 2. Zielordner und Schreibzugriff prüfen: Ziel anlegen, Testverzeichnis anlegen + sofort löschen
mk = _rclone('mkdir', dest, timeout=15)
if mk.returncode != 0:
err = mk.stderr.strip().splitlines()[-1] if mk.stderr.strip() else 'Zielordner konnte nicht angelegt werden'
return False, f'Zielordner: {err}'
rw = _rclone('mkdir', test_dir, timeout=15)
if rw.returncode != 0:
err = rw.stderr.strip().splitlines()[-1] if rw.stderr.strip() else 'Schreiben fehlgeschlagen'
return False, f'Kein Schreibzugriff: {err}'
_rclone('rmdir', test_dir, timeout=10)
return True, ''
def run_uploads(local_dir: Path, cfg: dict, upload_files=None):
"""Lädt die zuletzt lokal geschriebenen Dateien zu allen aktiven Fernzielen hoch."""
# Frische Config laden damit zwischenzeitliche Änderungen (z.B. Deaktivierung) berücksichtigt werden
current_cfg = load_cfg()
targets = [t for t in current_cfg.get('upload_targets', []) if t.get('enabled')]
if not targets:
return
with upload_lock:
upload_state.update(running=True, results=[], current='',
progress=0, total=0, done=0,
bytes_total=0, bytes_done=0,
current_file='', eta_sec=None, speed_bps=0)
for t in targets:
name = t.get('name', t['id'])
with upload_lock:
upload_state.update(current=name, progress=0, total=0, done=0,
bytes_total=0, bytes_done=0,
current_file='', eta_sec=None, speed_bps=0)
add_log(f'Upload >> {name}...')
dest_root = t.get('dest_path', 'PiCopy').strip('/')
root = _smb_conn(t)
# local_dir ist der lokal erzeugte Datumsordner. Auf dem NAS soll die
# gleiche Struktur entstehen wie auf dem Ziellaufwerk: Ziel/Datum/...
dest_rel = _join_remote_path(dest_root, local_dir.name)
dest = _smb_conn(t, dest_rel)
share = t.get('smb_share', '')
dest_label = _join_remote_path(share, dest_rel) or '/'
add_log(f'Upload {name}: Ziel {dest_label}')
# Quellverzeichnis prüfen
if not local_dir.exists():
err = f'Quellverzeichnis nicht gefunden: {local_dir}'
add_log(f'Upload {name}: ✗ {err}')
with upload_lock:
upload_state['results'].append({'name': name, 'ok': False, 'msg': err})
continue
# 1. Verbindung prüfen
conn = _rclone('lsd', root, timeout=15)
add_log(f'Upload {name}: Verbindung rc={conn.returncode}')
if conn.returncode != 0:
err = (conn.stderr.strip().splitlines()[-1] if conn.stderr.strip()
else 'NAS nicht erreichbar')
add_log(f'Upload {name}: ✗ {err}')
with upload_lock:
upload_state['results'].append({'name': name, 'ok': False, 'msg': err})
continue
# 2. Zielordner anlegen
mk = _rclone('mkdir', dest, timeout=30)
add_log(f'Upload {name}: mkdir rc={mk.returncode}'
+ (f' err={mk.stderr.strip()[:100]}' if mk.returncode != 0 else ''))
# 3. Kopieren mit Fortschritt
add_log(f'Upload {name}: starte copy von {local_dir}')
dup_mode = cfg.get('duplicate_handling', 'skip')
if upload_files is None:
files = sorted(f for f in local_dir.rglob('*') if f.is_file())
else:
files = []
for f in upload_files:
f = Path(f)
try:
f.relative_to(local_dir)
except ValueError:
continue
if f.is_file():
files.append(f)
files = sorted(files)
dirs = sorted({p for f in files for p in f.relative_to(local_dir).parents
if str(p) != '.'})
bytes_total = sum(f.stat().st_size for f in files)
with upload_lock:
upload_state.update(total=len(files), bytes_total=bytes_total,
progress=100 if not files else 0)
for d in dirs:
_rclone('mkdir', _smb_conn(t, _join_remote_path(dest_rel, d.as_posix())), timeout=30)
errors = []
skipped = 0
start_ts = time.time()
for idx, f in enumerate(files, start=1):
rel = f.relative_to(local_dir).as_posix()
fsize = f.stat().st_size
remote_rel = _join_remote_path(dest_rel, rel)
with upload_lock:
upload_state.update(done=idx, current_file=rel,
progress=int(idx / len(files) * 100) if files else 100)
if dup_mode == 'skip':
remote_size = _remote_size(_smb_conn(t, remote_rel))
if remote_size == fsize:
skipped += 1
with upload_lock:
bd = upload_state['bytes_done'] + fsize
elapsed = time.time() - start_ts
speed = bd / elapsed if elapsed > 1 else 0
eta = int((bytes_total - bd) / speed) if speed > 0 and bytes_total > bd else 0
upload_state.update(bytes_done=bd,
progress=int(bd / bytes_total * 100) if bytes_total else 100,
speed_bps=int(speed), eta_sec=eta)
continue
elif dup_mode == 'rename':
remote_rel = _remote_unique_rel_path(t, remote_rel)
with upload_lock:
base_done = upload_state['bytes_done']
rr = _rclone_copyto_progress(f, _smb_conn(t, remote_rel),
base_done, fsize, bytes_total, start_ts)
if rr.returncode != 0:
errors.append(rr.stderr.strip() or f'{rel}: unbekannter Fehler')
if len(errors) >= 5:
break
with upload_lock:
bd = base_done + fsize
elapsed = time.time() - start_ts
speed = bd / elapsed if elapsed > 1 else 0
eta = int((bytes_total - bd) / speed) if speed > 0 and bytes_total > bd else 0
upload_state.update(bytes_done=bd,
progress=int(bd / bytes_total * 100) if bytes_total else 100,
speed_bps=int(speed), eta_sec=eta)
r = subprocess.CompletedProcess(
args=['rclone', 'copyto'],
returncode=1 if errors else 0,
stdout='',
stderr='\n'.join(errors),
)
ok = r.returncode == 0
err = ''
if not ok:
err = r.stderr.strip() or 'Unbekannter Fehler'
add_log(f'Upload {name}: rclone stderr: {err[:300]}')
elif skipped:
add_log(f'Upload {name}: {skipped} Dateien übersprungen')
with upload_lock:
upload_state['results'].append({'name': name, 'ok': ok, 'msg': err})
add_log(f'Upload {name}: {"✓ OK" if ok else "✗ Fehler - " + err}')
with upload_lock:
upload_state['running'] = False
upload_state['current'] = ''
upload_state['current_file'] = ''