416 lines
16 KiB
Python
416 lines
16 KiB
Python
"""PiCopy – Kopierlogik: do_copy, check_auto_copy, usb_monitor."""
|
||
|
||
import hashlib as _hashlib
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import threading
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
from picopy.config import load_cfg, _fmt_bytes, log
|
||
from picopy.state import (
|
||
copy_state, copy_lock, save_state, append_history, add_log
|
||
)
|
||
from picopy.usb import usb_devices, ensure_mount, internal_dest_device
|
||
|
||
_copy_thread: threading.Thread | None = None
|
||
|
||
SYSTEM_EXCLUDES = {
|
||
'.DS_Store', 'Thumbs.db', 'thumbs.db', 'desktop.ini',
|
||
'.Spotlight-V100', '.Trashes', '.fseventsd', '.TemporaryItems',
|
||
'.VolumeIcon.icns', 'RECYCLER', '$RECYCLE.BIN',
|
||
'System Volume Information', '.DocumentRevisions-V100',
|
||
}
|
||
|
||
|
||
def _should_copy(f: Path, cfg: dict) -> bool:
|
||
if cfg.get('exclude_system'):
|
||
for part in f.parts:
|
||
if part in SYSTEM_EXCLUDES:
|
||
return False
|
||
if f.name.startswith('._'):
|
||
return False
|
||
filt = cfg.get('file_filter', '').strip()
|
||
if filt:
|
||
allowed = {e.strip().lower().lstrip('.') for e in filt.split(',') if e.strip()}
|
||
if f.suffix.lower().lstrip('.') not in allowed:
|
||
return False
|
||
return True
|
||
|
||
|
||
def _unique_path(p: Path) -> Path:
|
||
stem, suffix, parent = p.stem, p.suffix, p.parent
|
||
i = 1
|
||
while True:
|
||
candidate = parent / f'{stem}_({i}){suffix}'
|
||
if not candidate.exists():
|
||
return candidate
|
||
i += 1
|
||
|
||
|
||
def _file_md5(p: Path) -> str:
|
||
h = _hashlib.md5()
|
||
with open(p, 'rb') as f:
|
||
for chunk in iter(lambda: f.read(65536), b''):
|
||
h.update(chunk)
|
||
return h.hexdigest()
|
||
|
||
|
||
def _resolve_source_ports(cfg) -> list:
|
||
"""Gibt source_ports als [{port, label}]-Liste zurück. Migriert altes source_port-Feld."""
|
||
ports = cfg.get('source_ports') or []
|
||
if not ports and cfg.get('source_port'):
|
||
ports = [{'port': cfg['source_port'], 'label': cfg.get('source_label', '')}]
|
||
return ports
|
||
|
||
|
||
def _configured_destination(cfg, devs):
|
||
if cfg.get('dest_type') == 'internal':
|
||
return internal_dest_device(cfg)
|
||
return next((d for d in devs if d['usb_port'] == cfg.get('dest_port')), None)
|
||
|
||
|
||
def do_copy(src_devs, dst_dev, cfg):
|
||
"""Kopiert von einer oder mehreren Quellen auf ein Ziel."""
|
||
dst_mp = None
|
||
dst_owned = False
|
||
src_mounts = [] # [(src_dev, src_mp, src_owned)]
|
||
_upload_thread = None
|
||
_hist = {
|
||
'start': time.time(),
|
||
'ok': False, 'copied': 0, 'skipped': 0, 'errors': 0,
|
||
'bytes': 0, 'error_msg': '',
|
||
}
|
||
try:
|
||
with copy_lock:
|
||
copy_state.update(running=True, progress=0, error=None,
|
||
done=0, total=0, logs=[], current='',
|
||
bytes_total=0, bytes_done=0,
|
||
start_ts=time.time(), eta_sec=None, speed_bps=0,
|
||
phase='copy',
|
||
space_warning=False, space_needed=0, space_free=0,
|
||
last_success_file='')
|
||
save_state()
|
||
n = len(src_devs)
|
||
add_log(f'Kopiervorgang gestartet ({n} Quelle{"n" if n != 1 else ""})')
|
||
|
||
dst_mp, dst_owned = ensure_mount(dst_dev)
|
||
if not dst_mp:
|
||
raise RuntimeError(f'Ziel nicht mountbar: {dst_dev["device"]}')
|
||
add_log(f'Ziel: {dst_mp} ({dst_dev["label"]})')
|
||
|
||
ts = datetime.now()
|
||
date_str = ts.strftime(cfg['folder_format'])
|
||
if cfg.get('add_time'):
|
||
date_str += '_' + ts.strftime('%H%M%S')
|
||
|
||
# -- Alle Quellen mounten & Dateien sammeln -------------------------
|
||
# source_data: [(src_dev, src_path, files, dst_dir, incomplete_marker)]
|
||
source_data = []
|
||
total = 0
|
||
bytes_total = 0
|
||
|
||
for src_dev in src_devs:
|
||
with copy_lock:
|
||
cancelled = not copy_state['running']
|
||
if cancelled:
|
||
add_log('Abgebrochen')
|
||
return
|
||
|
||
src_mp_i, src_owned_i = ensure_mount(src_dev)
|
||
src_mounts.append((src_dev, src_mp_i, src_owned_i))
|
||
if not src_mp_i:
|
||
add_log(f'Quelle nicht mountbar: {src_dev["device"]} - übersprungen')
|
||
continue
|
||
|
||
add_log(f'Quelle: {src_mp_i} ({src_dev["label"]})')
|
||
src_path = Path(src_mp_i)
|
||
all_files = [f for f in src_path.rglob('*') if f.is_file()]
|
||
files = [f for f in all_files if _should_copy(f, cfg)]
|
||
n_filtered = len(all_files) - len(files)
|
||
if n_filtered:
|
||
add_log(f'{n_filtered} Dateien gefiltert ({src_dev["label"]})')
|
||
|
||
label = re.sub(r'[^\w\-]', '_', src_dev.get('label', 'source'))
|
||
dst_dir_i = Path(dst_mp) / date_str
|
||
if cfg.get('subfolder'):
|
||
dst_dir_i = dst_dir_i / label
|
||
dst_dir_i.mkdir(parents=True, exist_ok=True)
|
||
add_log(f'Zielordner: {dst_dir_i}')
|
||
|
||
for stale in dst_dir_i.rglob('*.picopy_tmp'):
|
||
stale.unlink(missing_ok=True)
|
||
|
||
incomplete_marker_i = dst_dir_i / '.picopy_incomplete'
|
||
import json as _json
|
||
incomplete_marker_i.write_text(_json.dumps({
|
||
'started': datetime.now().isoformat(),
|
||
'source': src_dev.get('label', ''),
|
||
}))
|
||
|
||
total += len(files)
|
||
bytes_total += sum(f.stat().st_size for f in files)
|
||
source_data.append((src_dev, src_path, files, dst_dir_i, incomplete_marker_i))
|
||
|
||
with copy_lock:
|
||
copy_state['total'] = total
|
||
copy_state['bytes_total'] = bytes_total
|
||
add_log(f'{total} Dateien gesamt ({_fmt_bytes(bytes_total)})')
|
||
|
||
# -- Speicherplatz-Prüfung ------------------------------------------
|
||
try:
|
||
dst_free = shutil.disk_usage(dst_mp).free
|
||
except Exception:
|
||
dst_free = 0
|
||
if bytes_total > 0 and dst_free < bytes_total:
|
||
with copy_lock:
|
||
copy_state.update(space_warning=True,
|
||
space_needed=bytes_total,
|
||
space_free=dst_free)
|
||
add_log(
|
||
f'⚠ Nicht genug Speicherplatz! '
|
||
f'Benötigt: {_fmt_bytes(bytes_total)}, '
|
||
f'Verfügbar: {_fmt_bytes(dst_free)} – '
|
||
f'Quelle wird nicht gelöscht'
|
||
)
|
||
save_state()
|
||
|
||
# -- Phase 1: Kopieren (alle Quellen) --------------------------------
|
||
dup_mode = cfg.get('duplicate_handling', 'skip')
|
||
all_copied_pairs = []
|
||
skipped = 0
|
||
io_errors = 0
|
||
global_done = 0
|
||
|
||
for src_dev_i, src_path_i, files_i, dst_dir_i, _ in source_data:
|
||
if len(src_devs) > 1:
|
||
add_log(f'Kopiere: {src_dev_i["label"]}')
|
||
for f in files_i:
|
||
with copy_lock:
|
||
cancelled = not copy_state['running']
|
||
if cancelled:
|
||
add_log('Abgebrochen')
|
||
return
|
||
global_done += 1
|
||
rel = f.relative_to(src_path_i)
|
||
dst_f = dst_dir_i / rel
|
||
try:
|
||
dst_f.parent.mkdir(parents=True, exist_ok=True)
|
||
except OSError as mkdir_err:
|
||
io_errors += 1
|
||
add_log(f'⚠ Verzeichnis nicht erstellbar ({dst_f.parent.name}): {mkdir_err}')
|
||
with copy_lock:
|
||
copy_state.update(done=global_done,
|
||
progress=int(global_done/total*100) if total else 100,
|
||
current=str(f.name))
|
||
continue
|
||
|
||
if dst_f.exists():
|
||
if dup_mode == 'skip':
|
||
if dst_f.stat().st_size == f.stat().st_size:
|
||
skipped += 1
|
||
with copy_lock:
|
||
copy_state.update(done=global_done,
|
||
progress=int(global_done/total*100) if total else 100,
|
||
current=str(f.name))
|
||
continue
|
||
else:
|
||
add_log(f'Unvollständige Datei, wird neu kopiert: {f.name}')
|
||
elif dup_mode == 'rename':
|
||
dst_f = _unique_path(dst_f)
|
||
|
||
fsize = f.stat().st_size
|
||
tmp_f = dst_f.with_name(dst_f.name + '.picopy_tmp')
|
||
try:
|
||
shutil.copy2(f, tmp_f)
|
||
os.replace(str(tmp_f), str(dst_f))
|
||
except OSError as copy_err:
|
||
try: tmp_f.unlink(missing_ok=True)
|
||
except Exception: pass
|
||
io_errors += 1
|
||
add_log(f'⚠ Fehler bei {f.name}: {copy_err}')
|
||
with copy_lock:
|
||
copy_state.update(done=global_done,
|
||
progress=int(global_done/total*100) if total else 100,
|
||
current=str(f.name))
|
||
continue
|
||
all_copied_pairs.append((f, dst_f))
|
||
|
||
with copy_lock:
|
||
copy_state['bytes_done'] += fsize
|
||
copy_state['last_success_file'] = str(dst_f)
|
||
bd = copy_state['bytes_done']
|
||
bt = copy_state['bytes_total']
|
||
elapsed = time.time() - copy_state['start_ts']
|
||
speed = bd / elapsed if elapsed > 1 else 0
|
||
eta = int((bt - bd) / speed) if speed > 0 and bt > bd else 0
|
||
copy_state.update(done=global_done,
|
||
progress=int(global_done/total*100) if total else 100,
|
||
current=str(f.name), speed_bps=int(speed), eta_sec=eta)
|
||
if global_done % 20 == 0:
|
||
save_state()
|
||
|
||
msg_parts = [f'{len(all_copied_pairs)} kopiert']
|
||
if skipped:
|
||
msg_parts.append(f'{skipped} übersprungen')
|
||
if io_errors:
|
||
msg_parts.append(f'{io_errors} Fehler (I/O)')
|
||
|
||
# -- Phase 2: Verifizieren ------------------------------------------
|
||
verify_errors = 0
|
||
verified_pairs = list(all_copied_pairs)
|
||
|
||
if cfg.get('verify_checksum') and all_copied_pairs:
|
||
with copy_lock:
|
||
copy_state.update(phase='verify', progress=0, done=0,
|
||
total=len(all_copied_pairs), current='',
|
||
eta_sec=None, speed_bps=0)
|
||
add_log(f'Verifiziere {len(all_copied_pairs)} Dateien...')
|
||
verified_pairs = []
|
||
|
||
for i, (src_f, dst_f) in enumerate(all_copied_pairs):
|
||
with copy_lock:
|
||
cancelled = not copy_state['running']
|
||
if not cancelled:
|
||
copy_state.update(done=i+1,
|
||
progress=int((i+1)/len(all_copied_pairs)*100),
|
||
current=src_f.name)
|
||
if cancelled:
|
||
add_log('Abgebrochen')
|
||
return
|
||
if _file_md5(src_f) == _file_md5(dst_f):
|
||
verified_pairs.append((src_f, dst_f))
|
||
else:
|
||
verify_errors += 1
|
||
add_log(f'⚠ Prüfsummenfehler: {src_f.name}')
|
||
try: dst_f.unlink()
|
||
except Exception: pass
|
||
|
||
if verify_errors:
|
||
msg_parts.append(f'{verify_errors} Prüfsummenfehler!')
|
||
add_log(f'Verifizierung: {verify_errors} Fehler!')
|
||
else:
|
||
add_log(f'Alle {len(verified_pairs)} Dateien verifiziert ✓')
|
||
|
||
# -- Phase 3: Quelle löschen ----------------------------------------
|
||
if cfg.get('delete_source') and verified_pairs:
|
||
with copy_lock:
|
||
_space_warn = copy_state.get('space_warning', False)
|
||
if _space_warn:
|
||
add_log('Quelldateien NICHT gelöscht (Speicherplatz unzureichend)')
|
||
elif verify_errors:
|
||
add_log('Quelldateien NICHT gelöscht (Prüfsummenfehler)')
|
||
else:
|
||
with copy_lock:
|
||
copy_state.update(phase='delete', current='')
|
||
add_log(f'Lösche {len(verified_pairs)} Quelldateien...')
|
||
del_errors = 0
|
||
for src_f, _ in verified_pairs:
|
||
try:
|
||
src_f.unlink()
|
||
except Exception as e:
|
||
del_errors += 1
|
||
log.warning(f'Löschen fehlgeschlagen: {src_f}: {e}')
|
||
if del_errors:
|
||
msg_parts.append(f'{del_errors} Löschfehler')
|
||
else:
|
||
add_log('Quelle geleert ✓')
|
||
|
||
subprocess.run(['sync'], capture_output=True)
|
||
for _, _, _, _, incomplete_marker_i in source_data:
|
||
try: incomplete_marker_i.unlink(missing_ok=True)
|
||
except Exception: pass
|
||
|
||
with copy_lock:
|
||
copy_state['last_copy'] = datetime.now().isoformat()
|
||
_hist['bytes'] = copy_state['bytes_done']
|
||
_hist.update(ok=True, copied=len(all_copied_pairs),
|
||
skipped=skipped, errors=io_errors)
|
||
add_log('Fertig! ' + ', '.join(msg_parts))
|
||
|
||
dst_dir_root = Path(dst_mp) / date_str
|
||
upload_files = [dst_f for _, dst_f in verified_pairs if dst_f.exists()]
|
||
if upload_files:
|
||
from picopy.upload import run_uploads
|
||
_upload_thread = threading.Thread(
|
||
target=run_uploads,
|
||
args=(dst_dir_root, cfg, upload_files),
|
||
daemon=True
|
||
)
|
||
_upload_thread.start()
|
||
elif any(t.get('enabled') for t in cfg.get('upload_targets', [])):
|
||
add_log('NAS-Upload: keine neu auf das Ziel übertragenen Dateien')
|
||
|
||
except Exception as e:
|
||
log.exception('Copy failed')
|
||
with copy_lock:
|
||
copy_state['error'] = str(e)
|
||
_hist['error_msg'] = str(e)
|
||
add_log(f'Fehler: {e}')
|
||
|
||
finally:
|
||
# Erst warten bis NAS-Upload fertig, dann erst unmounten
|
||
if _upload_thread is not None and _upload_thread.is_alive():
|
||
add_log('Warte auf NAS-Upload vor Unmount...')
|
||
_upload_thread.join()
|
||
subprocess.run(['sync'], capture_output=True)
|
||
for _, src_mp_i, src_owned_i in src_mounts:
|
||
if src_owned_i and src_mp_i:
|
||
subprocess.run(['umount', src_mp_i], capture_output=True)
|
||
if dst_owned and dst_mp:
|
||
subprocess.run(['umount', dst_mp], capture_output=True)
|
||
with copy_lock:
|
||
copy_state['running'] = False
|
||
copy_state['current'] = ''
|
||
copy_state['phase'] = 'idle'
|
||
save_state()
|
||
# Verlaufseintrag speichern
|
||
append_history({
|
||
'ts': datetime.now().isoformat(),
|
||
'duration': int(time.time() - _hist['start']),
|
||
'sources': [d.get('label', d.get('device', '?')) for d in src_devs],
|
||
'dest': dst_dev.get('label', dst_dev.get('device', '?')) if dst_dev else '?',
|
||
'copied': _hist['copied'],
|
||
'skipped': _hist['skipped'],
|
||
'errors': _hist['errors'],
|
||
'bytes': _hist['bytes'],
|
||
'ok': _hist['ok'],
|
||
'error': _hist['error_msg'],
|
||
})
|
||
|
||
|
||
def check_auto_copy():
|
||
cfg = load_cfg()
|
||
src_ports = _resolve_source_ports(cfg)
|
||
if not cfg.get('auto_copy') or not src_ports:
|
||
return
|
||
if cfg.get('dest_type') != 'internal' and not cfg.get('dest_port'):
|
||
return
|
||
with copy_lock:
|
||
if copy_state['running'] or copy_state['error']:
|
||
return
|
||
devs = usb_devices()
|
||
srcs = [next((d for d in devs if d['usb_port'] == sp['port']), None) for sp in src_ports]
|
||
srcs = [s for s in srcs if s is not None]
|
||
dst = _configured_destination(cfg, devs)
|
||
if srcs and dst:
|
||
log.info(f'Auto-Copy: {len(srcs)} Quelle(n) und Ziel verbunden')
|
||
threading.Thread(target=do_copy, args=(srcs, dst, cfg), daemon=True).start()
|
||
|
||
|
||
def usb_monitor():
|
||
try:
|
||
import pyudev
|
||
ctx = pyudev.Context()
|
||
mon = pyudev.Monitor.from_netlink(ctx)
|
||
mon.filter_by(subsystem='block', device_type='disk')
|
||
for dev in iter(mon.poll, None):
|
||
if dev.action == 'add':
|
||
log.info(f'USB eingesteckt: {dev.device_node}')
|
||
threading.Timer(3.0, check_auto_copy).start()
|
||
except ImportError:
|
||
log.warning('pyudev nicht verfügbar')
|