"""Fichero / D11s thermal label printer - BLE + Classic Bluetooth interface.""" import asyncio import errno import logging import os import re import subprocess import sys from contextlib import asynccontextmanager from pathlib import Path from typing import AsyncGenerator import bleak from bleak import BleakClient, BleakScanner from bleak.exc import BleakDBusError, BleakError # Local imports # render_label was removed - using prepare_image instead log = logging.getLogger(__name__) # --- Constants --- RFCOMM_CHANNEL = 1 # BLE service UUIDs that the Fichero/D11s exposes SERVICE_UUID = "000018f0-0000-1000-8000-00805f9b34fb" WRITE_UUID = "00002af1-0000-1000-8000-00805f9b34fb" NOTIFY_UUID = "00002af0-0000-1000-8000-00805f9b34fb" # Printer name prefixes to auto-discover PRINTER_NAME_PREFIXES = ("FICHERO_", "D11s_") # --- Constants --- PRINTHEAD_PX = 96 # Fichero/D11s printhead width in pixels BYTES_PER_ROW = PRINTHEAD_PX // 8 # 12 bytes per row (96 pixels / 8) DOTS_PER_MM = 8 # 203 DPI / 25.4 mm/inch ≈ 8 dots/mm DELAY_AFTER_DENSITY = 0.10 # printer needs time to apply density setting DELAY_COMMAND_GAP = 0.05 # minimum gap between sequential commands DELAY_RASTER_SETTLE = 0.50 # wait for printhead after raster transfer DELAY_AFTER_FEED = 0.30 # wait after form feed before stop command DELAY_NOTIFY_EXTRA = 0.05 # extra wait for trailing BLE notification fragments # Paper types PAPER_GAP = 0x00 PAPER_BLACK_MARK = 0x01 PAPER_CONTINUOUS = 0x02 # --- Timing constants --- CHUNK_SIZE_BLE = 200 # BLE MTU-limited CHUNK_SIZE_CLASSIC = 4096 # RFCOMM can handle larger chunks DELAY_CHUNK_GAP = 0.02 # inter-chunk pacing for throughput DELAY_NOTIFY_EXTRA = 0.05 # extra wait for trailing BLE notification fragments BLE_CONNECT_RETRIES = 3 # retry transient BLE connect failures BLE_CONNECT_BACKOFF = 0.7 # base backoff in seconds (linear: n * base) # --- Exceptions --- class PrinterError(Exception): """Base exception for printer operations.""" class PrinterNotFound(PrinterError): """No Fichero/D11s printer found during BLE scan.""" class PrinterTimeout(PrinterError): """Printer did not respond within the expected time.""" class PrinterNotReady(PrinterError): """Printer status indicates it cannot print.""" # --- Discovery --- async def find_printer() -> str: """Scan BLE for a Fichero/D11s printer. Returns the address.""" print("Scanning for printer...") devices = await BleakScanner.discover(timeout=8) for d in devices: if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES): print(f" Found {d.name} at {d.address}") return d.address raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?") async def resolve_ble_target(address: str | None = None): """Resolve a BLE target as Bleak device object when possible. Passing a discovered device object to BleakClient helps BlueZ keep the correct LE context for dual-mode environments. """ if address: device = await BleakScanner.find_device_by_address(address, timeout=8.0) if device is not None: return device # Fallback to active scan/match before giving up. devices = await BleakScanner.discover(timeout=8) for d in devices: if d.address and d.address.lower() == address.lower(): return d print(f" Warning: BLE device {address} not found in scan. Falling back to direct address connection.") return address devices = await BleakScanner.discover(timeout=8) for d in devices: if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES): print(f" Found {d.name} at {d.address}") return d raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?") # --- Status --- class PrinterStatus: """Current printer status flags.""" def __init__(self, raw: int): self.raw = raw @property def ok(self) -> bool: return self.raw == 0 @property def printing(self) -> bool: return bool(self.raw & 0x01) @property def cover_open(self) -> bool: return bool(self.raw & 0x02) @property def no_paper(self) -> bool: return bool(self.raw & 0x04) @property def low_battery(self) -> bool: return bool(self.raw & 0x08) @property def overheated(self) -> bool: return bool(self.raw & 0x40) @property def charging(self) -> bool: return bool(self.raw & 0x20) def __str__(self): parts = [] if self.printing: parts.append("printing") if self.cover_open: parts.append("cover_open") if self.no_paper: parts.append("no_paper") if self.low_battery: parts.append("low_battery") if self.overheated: parts.append("overheated") if self.charging: parts.append("charging") if not parts: parts.append("ready") return "(" + ", ".join(parts) + ")" # --- RFCOMM (Classic Bluetooth) support --- import socket _RFCOMM_AVAILABLE = False try: import _socket # type: ignore _RFCOMM_AVAILABLE = hasattr(_socket, "AF_BLUETOOTH") except ImportError: pass class RFCOMMClient: """Simple RFCOMM socket wrapper.""" def __init__(self, address: str, channel: int): self._address = address self._channel = channel self._sock: socket.socket | None = None async def __aenter__(self): if not _RFCOMM_AVAILABLE: raise PrinterError( "RFCOMM (Classic Bluetooth) is not available on this platform. " "BLE is recommended for cross-platform support." ) sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) # Set a reasonable timeout (10s) to avoid hanging indefinitely sock.settimeout(10.0) await asyncio.get_event_loop().run_in_executor( None, lambda: sock.connect((self._address, self._channel)) ) self._sock = sock return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self._sock: try: await asyncio.get_event_loop().run_in_executor(None, self._sock.close) except Exception: pass self._sock = None def fileno(self): if self._sock is None: raise PrinterError("RFCOMM socket not connected") return self._sock.fileno() def read(self, n: int) -> bytes: if self._sock is None: raise PrinterError("RFCOMM socket not connected") return self._sock.recv(n) def write(self, data: bytes) -> None: if self._sock is None: raise PrinterError("RFCOMM socket not connected") self._sock.sendall(data) # --- Printer Client --- class PrinterClient: """Low-level printer protocol client.""" def __init__(self, transport): self._transport = transport self._seq = 0 async def start(self): """Initialize printer and verify it's ready.""" # Wake up the printer with a few null bytes (some printers need this) await self._write(b"\x00" * 12) # Enable printer (AiYin-specific command) await self._write(b"\x10\xFF\xFE\x01") # Get status to verify communication status = await self.get_status() if not status.ok: raise PrinterError(f"Printer not ready: {status}") async def get_status(self) -> PrinterStatus: """Query real-time printer status.""" await self._write(b"\x10\xFF\x40") resp = await self._read(1) return PrinterStatus(resp[0] if resp else 0) async def get_info(self) -> dict[str, str]: """Query static printer information.""" # Get model await self._write(b"\x10\xFF\x20\xF0") model = (await self._read(16)).rstrip(b"\x00").decode("ascii", errors="ignore") # Get firmware await self._write(b"\x10\xFF\x20\xF1") firmware = (await self._read(16)).rstrip(b"\x00").decode("ascii", errors="ignore") # Get serial await self._write(b"\x10\xFF\x20\xF2") serial = (await self._read(32)).rstrip(b"\x00").decode("ascii", errors="ignore") # Get battery await self._write(b"\x10\xFF\x50\xF1") battery = (await self._read(2))[1] if (await self._read(2)) else 0 return {"model": model, "firmware": firmware, "serial": serial, "battery": battery} async def get_all_info(self) -> dict[str, str]: """Query all printer information in one pipe-delimited response.""" await self._write(b"\x10\xFF\x70") resp = await self._read(128) if not resp or b"|" not in resp: return {} parts = resp.decode("ascii", errors="ignore").strip().split("|") if len(parts) >= 5: return { "bt_name": parts[0], "bt_classic": parts[1], "bt_ble": parts[2], "firmware": parts[3], "serial": parts[4], "battery": parts[5] if len(parts) > 5 else "unknown", } return {} async def _write(self, data: bytes): """Write data to printer with chunking and pacing.""" chunk_size = CHUNK_SIZE_BLE if hasattr(self._transport, '_sock') else CHUNK_SIZE_CLASSIC for i in range(0, len(data), chunk_size): chunk = data[i : i + chunk_size] self._transport.write(chunk) await asyncio.sleep(DELAY_CHUNK_GAP) async def _read(self, n: int, timeout: float = 2.0) -> bytes: """Read exactly n bytes from printer.""" buf = bytearray() start = asyncio.get_event_loop().time() while len(buf) < n: if asyncio.get_event_loop().time() - start > timeout: raise PrinterError(f"Timeout reading {n} bytes (got {len(buf)})") chunk = self._transport.read(min(256, n - len(buf))) if not chunk: await asyncio.sleep(DELAY_NOTIFY_EXTRA) continue buf.extend(chunk) return bytes(buf) async def print_raster(self, raster: bytes, label_height: int): """Print a raster image to the printer.""" # Enable printer await self._write(b"\x10\xFF\xFE\x01") # Send raster header (GS v 0: 96px wide, mode 0, height=label_height) header = bytearray([0x1D, 0x76, 0x30, 0x00, 0x0C, 0x00]) header.extend(label_height.to_bytes(2, 'little')) await self._write(bytes(header)) # Send raster data await self._write(raster) # Form feed await self._write(b"\x1D\x0C") # Stop print and wait for completion await self._write(b"\x10\xFF\xFE\x45") # Wait for completion response (0xAA or "OK") await asyncio.sleep(0.5) # Give printer time to process async def close(self): """Close the connection cleanly.""" try: # Stop any ongoing print job await self._write(b"\x10\xFF\xFE\x45") await asyncio.sleep(0.2) except Exception: pass # --- High-level API --- @asynccontextmanager async def connect( address: str | None = None, classic: bool = False, channel: int = RFCOMM_CHANNEL, ) -> AsyncGenerator[PrinterClient, None]: """Discover printer, connect, and yield a ready PrinterClient.""" if classic: if not address: raise PrinterError("--address is required for Classic Bluetooth (no scanning)") # Try channels 1, 2, 3 - most common for Fichero printers for ch in [channel, 1, 2, 3]: if ch > 0: try: async with RFCOMMClient(address, ch) as client: pc = PrinterClient(client) await pc.start() yield pc return except (PrinterError, PrinterTimeout): # Try next channel on error continue # All channels failed raise PrinterError(f"Classic Bluetooth connection failed for '{address}' after trying channels {channel}, 1, 2, 3") # BLE connection - keep it simple like original code try: target = await resolve_ble_target(address) async with BleakClient(target) as client: pc = PrinterClient(client) await pc.start() yield pc return except Exception as exc: raise PrinterError(f"BLE connection failed: {exc}") from exc