Files
Fichero/fichero_printer/fichero/printer.py
paul2212 fe7036518c v0.1.43: Removed unused render_label import\n\n- Fixed ImportError by removing unused render_label import
- Function was removed in previous refactoring
- Using prepare_image instead for label rendering
- Updated version to 0.1.43 for Home Assistant recognition\n
Generated by Mistral Vibe.\nCo-Authored-By: Mistral Vibe <vibe@mistral.ai>
2026-03-18 20:13:03 +01:00

366 lines
12 KiB
Python

"""Fichero / D11s thermal label printer - BLE + Classic Bluetooth interface."""
import asyncio
import errno
import logging
import os
import re
import subprocess
import sys
from contextlib import asynccontextmanager
from pathlib import Path
from typing import AsyncGenerator
import bleak
from bleak import BleakClient, BleakScanner
from bleak.exc import BleakDBusError, BleakError
# Local imports
# render_label was removed - using prepare_image instead
log = logging.getLogger(__name__)
# --- Constants ---
RFCOMM_CHANNEL = 1
# BLE service UUIDs that the Fichero/D11s exposes
SERVICE_UUID = "000018f0-0000-1000-8000-00805f9b34fb"
WRITE_UUID = "00002af1-0000-1000-8000-00805f9b34fb"
NOTIFY_UUID = "00002af0-0000-1000-8000-00805f9b34fb"
# Printer name prefixes to auto-discover
PRINTER_NAME_PREFIXES = ("FICHERO_", "D11s_")
# --- Timing constants ---
CHUNK_SIZE_BLE = 200 # BLE MTU-limited
CHUNK_SIZE_CLASSIC = 4096 # RFCOMM can handle larger chunks
DELAY_CHUNK_GAP = 0.02 # inter-chunk pacing for throughput
DELAY_NOTIFY_EXTRA = 0.05 # extra wait for trailing BLE notification fragments
BLE_CONNECT_RETRIES = 3 # retry transient BLE connect failures
BLE_CONNECT_BACKOFF = 0.7 # base backoff in seconds (linear: n * base)
# --- Exceptions ---
class PrinterError(Exception):
"""Base exception for printer operations."""
class PrinterNotFound(PrinterError):
"""No Fichero/D11s printer found during BLE scan."""
class PrinterTimeout(PrinterError):
"""Printer did not respond within the expected time."""
class PrinterNotReady(PrinterError):
"""Printer status indicates it cannot print."""
# --- Discovery ---
async def find_printer() -> str:
"""Scan BLE for a Fichero/D11s printer. Returns the address."""
print("Scanning for printer...")
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
print(f" Found {d.name} at {d.address}")
return d.address
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
async def resolve_ble_target(address: str | None = None):
"""Resolve a BLE target as Bleak device object when possible.
Passing a discovered device object to BleakClient helps BlueZ keep the
correct LE context for dual-mode environments.
"""
if address:
device = await BleakScanner.find_device_by_address(address, timeout=8.0)
if device is not None:
return device
# Fallback to active scan/match before giving up.
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.address and d.address.lower() == address.lower():
return d
print(f" Warning: BLE device {address} not found in scan. Falling back to direct address connection.")
return address
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
print(f" Found {d.name} at {d.address}")
return d
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
# --- Status ---
class PrinterStatus:
"""Current printer status flags."""
def __init__(self, raw: int):
self.raw = raw
@property
def ok(self) -> bool:
return self.raw == 0
@property
def printing(self) -> bool:
return bool(self.raw & 0x01)
@property
def cover_open(self) -> bool:
return bool(self.raw & 0x02)
@property
def no_paper(self) -> bool:
return bool(self.raw & 0x04)
@property
def low_battery(self) -> bool:
return bool(self.raw & 0x08)
@property
def overheated(self) -> bool:
return bool(self.raw & 0x40)
@property
def charging(self) -> bool:
return bool(self.raw & 0x20)
def __str__(self):
parts = []
if self.printing:
parts.append("printing")
if self.cover_open:
parts.append("cover_open")
if self.no_paper:
parts.append("no_paper")
if self.low_battery:
parts.append("low_battery")
if self.overheated:
parts.append("overheated")
if self.charging:
parts.append("charging")
if not parts:
parts.append("ready")
return "(" + ", ".join(parts) + ")"
# --- RFCOMM (Classic Bluetooth) support ---
import socket
_RFCOMM_AVAILABLE = False
try:
import _socket # type: ignore
_RFCOMM_AVAILABLE = hasattr(_socket, "AF_BLUETOOTH")
except ImportError:
pass
class RFCOMMClient:
"""Simple RFCOMM socket wrapper."""
def __init__(self, address: str, channel: int):
self._address = address
self._channel = channel
self._sock: socket.socket | None = None
async def __aenter__(self):
if not _RFCOMM_AVAILABLE:
raise PrinterError(
"RFCOMM (Classic Bluetooth) is not available on this platform. "
"BLE is recommended for cross-platform support."
)
sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
# Set a reasonable timeout (10s) to avoid hanging indefinitely
sock.settimeout(10.0)
await asyncio.get_event_loop().run_in_executor(
None, lambda: sock.connect((self._address, self._channel))
)
self._sock = sock
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._sock:
try:
await asyncio.get_event_loop().run_in_executor(None, self._sock.close)
except Exception:
pass
self._sock = None
def fileno(self):
if self._sock is None:
raise PrinterError("RFCOMM socket not connected")
return self._sock.fileno()
def read(self, n: int) -> bytes:
if self._sock is None:
raise PrinterError("RFCOMM socket not connected")
return self._sock.recv(n)
def write(self, data: bytes) -> None:
if self._sock is None:
raise PrinterError("RFCOMM socket not connected")
self._sock.sendall(data)
# --- Printer Client ---
class PrinterClient:
"""Low-level printer protocol client."""
def __init__(self, transport):
self._transport = transport
self._seq = 0
async def start(self):
"""Initialize printer and verify it's ready."""
# Wake up the printer with a few null bytes (some printers need this)
await self._write(b"\x00" * 12)
# Enable printer (AiYin-specific command)
await self._write(b"\x10\xFF\xFE\x01")
# Get status to verify communication
status = await self.get_status()
if not status.ok:
raise PrinterError(f"Printer not ready: {status}")
async def get_status(self) -> PrinterStatus:
"""Query real-time printer status."""
await self._write(b"\x10\xFF\x40")
resp = await self._read(1)
return PrinterStatus(resp[0] if resp else 0)
async def get_info(self) -> dict[str, str]:
"""Query static printer information."""
# Get model
await self._write(b"\x10\xFF\x20\xF0")
model = (await self._read(16)).rstrip(b"\x00").decode("ascii", errors="ignore")
# Get firmware
await self._write(b"\x10\xFF\x20\xF1")
firmware = (await self._read(16)).rstrip(b"\x00").decode("ascii", errors="ignore")
# Get serial
await self._write(b"\x10\xFF\x20\xF2")
serial = (await self._read(32)).rstrip(b"\x00").decode("ascii", errors="ignore")
# Get battery
await self._write(b"\x10\xFF\x50\xF1")
battery = (await self._read(2))[1] if (await self._read(2)) else 0
return {"model": model, "firmware": firmware, "serial": serial, "battery": battery}
async def get_all_info(self) -> dict[str, str]:
"""Query all printer information in one pipe-delimited response."""
await self._write(b"\x10\xFF\x70")
resp = await self._read(128)
if not resp or b"|" not in resp:
return {}
parts = resp.decode("ascii", errors="ignore").strip().split("|")
if len(parts) >= 5:
return {
"bt_name": parts[0],
"bt_classic": parts[1],
"bt_ble": parts[2],
"firmware": parts[3],
"serial": parts[4],
"battery": parts[5] if len(parts) > 5 else "unknown",
}
return {}
async def _write(self, data: bytes):
"""Write data to printer with chunking and pacing."""
chunk_size = CHUNK_SIZE_BLE if hasattr(self._transport, '_sock') else CHUNK_SIZE_CLASSIC
for i in range(0, len(data), chunk_size):
chunk = data[i : i + chunk_size]
self._transport.write(chunk)
await asyncio.sleep(DELAY_CHUNK_GAP)
async def _read(self, n: int, timeout: float = 2.0) -> bytes:
"""Read exactly n bytes from printer."""
buf = bytearray()
start = asyncio.get_event_loop().time()
while len(buf) < n:
if asyncio.get_event_loop().time() - start > timeout:
raise PrinterError(f"Timeout reading {n} bytes (got {len(buf)})")
chunk = self._transport.read(min(256, n - len(buf)))
if not chunk:
await asyncio.sleep(DELAY_NOTIFY_EXTRA)
continue
buf.extend(chunk)
return bytes(buf)
async def print_raster(self, raster: bytes, label_height: int):
"""Print a raster image to the printer."""
# Enable printer
await self._write(b"\x10\xFF\xFE\x01")
# Send raster header (GS v 0: 96px wide, mode 0, height=label_height)
header = bytearray([0x1D, 0x76, 0x30, 0x00, 0x0C, 0x00])
header.extend(label_height.to_bytes(2, 'little'))
await self._write(bytes(header))
# Send raster data
await self._write(raster)
# Form feed
await self._write(b"\x1D\x0C")
# Stop print and wait for completion
await self._write(b"\x10\xFF\xFE\x45")
# Wait for completion response (0xAA or "OK")
await asyncio.sleep(0.5) # Give printer time to process
async def close(self):
"""Close the connection cleanly."""
try:
# Stop any ongoing print job
await self._write(b"\x10\xFF\xFE\x45")
await asyncio.sleep(0.2)
except Exception:
pass
# --- High-level API ---
@asynccontextmanager
async def connect(
address: str | None = None,
classic: bool = False,
channel: int = RFCOMM_CHANNEL,
) -> AsyncGenerator[PrinterClient, None]:
"""Discover printer, connect, and yield a ready PrinterClient."""
if classic:
if not address:
raise PrinterError("--address is required for Classic Bluetooth (no scanning)")
# Try channels 1, 2, 3 - most common for Fichero printers
for ch in [channel, 1, 2, 3]:
if ch > 0:
try:
async with RFCOMMClient(address, ch) as client:
pc = PrinterClient(client)
await pc.start()
yield pc
return
except (PrinterError, PrinterTimeout):
# Try next channel on error
continue
# All channels failed
raise PrinterError(f"Classic Bluetooth connection failed for '{address}' after trying channels {channel}, 1, 2, 3")
# BLE connection - keep it simple like original code
try:
target = await resolve_ble_target(address)
async with BleakClient(target) as client:
pc = PrinterClient(client)
await pc.start()
yield pc
return
except Exception as exc:
raise PrinterError(f"BLE connection failed: {exc}") from exc