- Required by fichero.cli for proper printer operation - Maintains compatibility with original code\n- Updated version to 0.1.45 for Home Assistant recognition\n\nGenerated by Mistral Vibe.\nCo-Authored-By: Mistral Vibe <vibe@mistral.ai>
372 lines
12 KiB
Python
372 lines
12 KiB
Python
"""Fichero / D11s thermal label printer - BLE + Classic Bluetooth interface."""
|
|
|
|
import asyncio
|
|
import errno
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from contextlib import asynccontextmanager
|
|
from pathlib import Path
|
|
from typing import AsyncGenerator
|
|
|
|
import bleak
|
|
from bleak import BleakClient, BleakScanner
|
|
from bleak.exc import BleakDBusError, BleakError
|
|
|
|
# Local imports
|
|
# render_label was removed - using prepare_image instead
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# --- Constants ---
|
|
RFCOMM_CHANNEL = 1
|
|
|
|
# BLE service UUIDs that the Fichero/D11s exposes
|
|
SERVICE_UUID = "000018f0-0000-1000-8000-00805f9b34fb"
|
|
WRITE_UUID = "00002af1-0000-1000-8000-00805f9b34fb"
|
|
NOTIFY_UUID = "00002af0-0000-1000-8000-00805f9b34fb"
|
|
|
|
# Printer name prefixes to auto-discover
|
|
PRINTER_NAME_PREFIXES = ("FICHERO_", "D11s_")
|
|
|
|
# --- Constants ---
|
|
PRINTHEAD_PX = 96 # Fichero/D11s printhead width in pixels
|
|
BYTES_PER_ROW = PRINTHEAD_PX // 8 # 12 bytes per row (96 pixels / 8)
|
|
DOTS_PER_MM = 8 # 203 DPI / 25.4 mm/inch ≈ 8 dots/mm
|
|
DELAY_AFTER_DENSITY = 0.10 # printer needs time to apply density setting
|
|
|
|
# --- Timing constants ---
|
|
CHUNK_SIZE_BLE = 200 # BLE MTU-limited
|
|
CHUNK_SIZE_CLASSIC = 4096 # RFCOMM can handle larger chunks
|
|
DELAY_CHUNK_GAP = 0.02 # inter-chunk pacing for throughput
|
|
DELAY_NOTIFY_EXTRA = 0.05 # extra wait for trailing BLE notification fragments
|
|
BLE_CONNECT_RETRIES = 3 # retry transient BLE connect failures
|
|
BLE_CONNECT_BACKOFF = 0.7 # base backoff in seconds (linear: n * base)
|
|
|
|
# --- Exceptions ---
|
|
|
|
|
|
class PrinterError(Exception):
|
|
"""Base exception for printer operations."""
|
|
|
|
|
|
class PrinterNotFound(PrinterError):
|
|
"""No Fichero/D11s printer found during BLE scan."""
|
|
|
|
|
|
class PrinterTimeout(PrinterError):
|
|
"""Printer did not respond within the expected time."""
|
|
|
|
|
|
class PrinterNotReady(PrinterError):
|
|
"""Printer status indicates it cannot print."""
|
|
|
|
|
|
# --- Discovery ---
|
|
|
|
|
|
async def find_printer() -> str:
|
|
"""Scan BLE for a Fichero/D11s printer. Returns the address."""
|
|
print("Scanning for printer...")
|
|
devices = await BleakScanner.discover(timeout=8)
|
|
for d in devices:
|
|
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
|
|
print(f" Found {d.name} at {d.address}")
|
|
return d.address
|
|
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
|
|
|
|
|
|
async def resolve_ble_target(address: str | None = None):
|
|
"""Resolve a BLE target as Bleak device object when possible.
|
|
|
|
Passing a discovered device object to BleakClient helps BlueZ keep the
|
|
correct LE context for dual-mode environments.
|
|
"""
|
|
if address:
|
|
device = await BleakScanner.find_device_by_address(address, timeout=8.0)
|
|
if device is not None:
|
|
return device
|
|
# Fallback to active scan/match before giving up.
|
|
devices = await BleakScanner.discover(timeout=8)
|
|
for d in devices:
|
|
if d.address and d.address.lower() == address.lower():
|
|
return d
|
|
print(f" Warning: BLE device {address} not found in scan. Falling back to direct address connection.")
|
|
return address
|
|
devices = await BleakScanner.discover(timeout=8)
|
|
for d in devices:
|
|
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
|
|
print(f" Found {d.name} at {d.address}")
|
|
return d
|
|
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
|
|
|
|
|
|
# --- Status ---
|
|
|
|
|
|
class PrinterStatus:
|
|
"""Current printer status flags."""
|
|
|
|
def __init__(self, raw: int):
|
|
self.raw = raw
|
|
|
|
@property
|
|
def ok(self) -> bool:
|
|
return self.raw == 0
|
|
|
|
@property
|
|
def printing(self) -> bool:
|
|
return bool(self.raw & 0x01)
|
|
|
|
@property
|
|
def cover_open(self) -> bool:
|
|
return bool(self.raw & 0x02)
|
|
|
|
@property
|
|
def no_paper(self) -> bool:
|
|
return bool(self.raw & 0x04)
|
|
|
|
@property
|
|
def low_battery(self) -> bool:
|
|
return bool(self.raw & 0x08)
|
|
|
|
@property
|
|
def overheated(self) -> bool:
|
|
return bool(self.raw & 0x40)
|
|
|
|
@property
|
|
def charging(self) -> bool:
|
|
return bool(self.raw & 0x20)
|
|
|
|
def __str__(self):
|
|
parts = []
|
|
if self.printing:
|
|
parts.append("printing")
|
|
if self.cover_open:
|
|
parts.append("cover_open")
|
|
if self.no_paper:
|
|
parts.append("no_paper")
|
|
if self.low_battery:
|
|
parts.append("low_battery")
|
|
if self.overheated:
|
|
parts.append("overheated")
|
|
if self.charging:
|
|
parts.append("charging")
|
|
if not parts:
|
|
parts.append("ready")
|
|
return "(" + ", ".join(parts) + ")"
|
|
|
|
|
|
# --- RFCOMM (Classic Bluetooth) support ---
|
|
|
|
|
|
import socket
|
|
|
|
_RFCOMM_AVAILABLE = False
|
|
try:
|
|
import _socket # type: ignore
|
|
|
|
_RFCOMM_AVAILABLE = hasattr(_socket, "AF_BLUETOOTH")
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
class RFCOMMClient:
|
|
"""Simple RFCOMM socket wrapper."""
|
|
|
|
def __init__(self, address: str, channel: int):
|
|
self._address = address
|
|
self._channel = channel
|
|
self._sock: socket.socket | None = None
|
|
|
|
async def __aenter__(self):
|
|
if not _RFCOMM_AVAILABLE:
|
|
raise PrinterError(
|
|
"RFCOMM (Classic Bluetooth) is not available on this platform. "
|
|
"BLE is recommended for cross-platform support."
|
|
)
|
|
sock = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
|
|
# Set a reasonable timeout (10s) to avoid hanging indefinitely
|
|
sock.settimeout(10.0)
|
|
await asyncio.get_event_loop().run_in_executor(
|
|
None, lambda: sock.connect((self._address, self._channel))
|
|
)
|
|
self._sock = sock
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
if self._sock:
|
|
try:
|
|
await asyncio.get_event_loop().run_in_executor(None, self._sock.close)
|
|
except Exception:
|
|
pass
|
|
self._sock = None
|
|
|
|
def fileno(self):
|
|
if self._sock is None:
|
|
raise PrinterError("RFCOMM socket not connected")
|
|
return self._sock.fileno()
|
|
|
|
def read(self, n: int) -> bytes:
|
|
if self._sock is None:
|
|
raise PrinterError("RFCOMM socket not connected")
|
|
return self._sock.recv(n)
|
|
|
|
def write(self, data: bytes) -> None:
|
|
if self._sock is None:
|
|
raise PrinterError("RFCOMM socket not connected")
|
|
self._sock.sendall(data)
|
|
|
|
|
|
# --- Printer Client ---
|
|
|
|
|
|
class PrinterClient:
|
|
"""Low-level printer protocol client."""
|
|
|
|
def __init__(self, transport):
|
|
self._transport = transport
|
|
self._seq = 0
|
|
|
|
async def start(self):
|
|
"""Initialize printer and verify it's ready."""
|
|
# Wake up the printer with a few null bytes (some printers need this)
|
|
await self._write(b"\x00" * 12)
|
|
# Enable printer (AiYin-specific command)
|
|
await self._write(b"\x10\xFF\xFE\x01")
|
|
# Get status to verify communication
|
|
status = await self.get_status()
|
|
if not status.ok:
|
|
raise PrinterError(f"Printer not ready: {status}")
|
|
|
|
async def get_status(self) -> PrinterStatus:
|
|
"""Query real-time printer status."""
|
|
await self._write(b"\x10\xFF\x40")
|
|
resp = await self._read(1)
|
|
return PrinterStatus(resp[0] if resp else 0)
|
|
|
|
async def get_info(self) -> dict[str, str]:
|
|
"""Query static printer information."""
|
|
# Get model
|
|
await self._write(b"\x10\xFF\x20\xF0")
|
|
model = (await self._read(16)).rstrip(b"\x00").decode("ascii", errors="ignore")
|
|
# Get firmware
|
|
await self._write(b"\x10\xFF\x20\xF1")
|
|
firmware = (await self._read(16)).rstrip(b"\x00").decode("ascii", errors="ignore")
|
|
# Get serial
|
|
await self._write(b"\x10\xFF\x20\xF2")
|
|
serial = (await self._read(32)).rstrip(b"\x00").decode("ascii", errors="ignore")
|
|
# Get battery
|
|
await self._write(b"\x10\xFF\x50\xF1")
|
|
battery = (await self._read(2))[1] if (await self._read(2)) else 0
|
|
return {"model": model, "firmware": firmware, "serial": serial, "battery": battery}
|
|
|
|
async def get_all_info(self) -> dict[str, str]:
|
|
"""Query all printer information in one pipe-delimited response."""
|
|
await self._write(b"\x10\xFF\x70")
|
|
resp = await self._read(128)
|
|
if not resp or b"|" not in resp:
|
|
return {}
|
|
parts = resp.decode("ascii", errors="ignore").strip().split("|")
|
|
if len(parts) >= 5:
|
|
return {
|
|
"bt_name": parts[0],
|
|
"bt_classic": parts[1],
|
|
"bt_ble": parts[2],
|
|
"firmware": parts[3],
|
|
"serial": parts[4],
|
|
"battery": parts[5] if len(parts) > 5 else "unknown",
|
|
}
|
|
return {}
|
|
|
|
async def _write(self, data: bytes):
|
|
"""Write data to printer with chunking and pacing."""
|
|
chunk_size = CHUNK_SIZE_BLE if hasattr(self._transport, '_sock') else CHUNK_SIZE_CLASSIC
|
|
for i in range(0, len(data), chunk_size):
|
|
chunk = data[i : i + chunk_size]
|
|
self._transport.write(chunk)
|
|
await asyncio.sleep(DELAY_CHUNK_GAP)
|
|
|
|
async def _read(self, n: int, timeout: float = 2.0) -> bytes:
|
|
"""Read exactly n bytes from printer."""
|
|
buf = bytearray()
|
|
start = asyncio.get_event_loop().time()
|
|
while len(buf) < n:
|
|
if asyncio.get_event_loop().time() - start > timeout:
|
|
raise PrinterError(f"Timeout reading {n} bytes (got {len(buf)})")
|
|
chunk = self._transport.read(min(256, n - len(buf)))
|
|
if not chunk:
|
|
await asyncio.sleep(DELAY_NOTIFY_EXTRA)
|
|
continue
|
|
buf.extend(chunk)
|
|
return bytes(buf)
|
|
|
|
async def print_raster(self, raster: bytes, label_height: int):
|
|
"""Print a raster image to the printer."""
|
|
# Enable printer
|
|
await self._write(b"\x10\xFF\xFE\x01")
|
|
# Send raster header (GS v 0: 96px wide, mode 0, height=label_height)
|
|
header = bytearray([0x1D, 0x76, 0x30, 0x00, 0x0C, 0x00])
|
|
header.extend(label_height.to_bytes(2, 'little'))
|
|
await self._write(bytes(header))
|
|
# Send raster data
|
|
await self._write(raster)
|
|
# Form feed
|
|
await self._write(b"\x1D\x0C")
|
|
# Stop print and wait for completion
|
|
await self._write(b"\x10\xFF\xFE\x45")
|
|
# Wait for completion response (0xAA or "OK")
|
|
await asyncio.sleep(0.5) # Give printer time to process
|
|
|
|
async def close(self):
|
|
"""Close the connection cleanly."""
|
|
try:
|
|
# Stop any ongoing print job
|
|
await self._write(b"\x10\xFF\xFE\x45")
|
|
await asyncio.sleep(0.2)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# --- High-level API ---
|
|
|
|
|
|
@asynccontextmanager
|
|
async def connect(
|
|
address: str | None = None,
|
|
classic: bool = False,
|
|
channel: int = RFCOMM_CHANNEL,
|
|
) -> AsyncGenerator[PrinterClient, None]:
|
|
"""Discover printer, connect, and yield a ready PrinterClient."""
|
|
if classic:
|
|
if not address:
|
|
raise PrinterError("--address is required for Classic Bluetooth (no scanning)")
|
|
|
|
# Try channels 1, 2, 3 - most common for Fichero printers
|
|
for ch in [channel, 1, 2, 3]:
|
|
if ch > 0:
|
|
try:
|
|
async with RFCOMMClient(address, ch) as client:
|
|
pc = PrinterClient(client)
|
|
await pc.start()
|
|
yield pc
|
|
return
|
|
except (PrinterError, PrinterTimeout):
|
|
# Try next channel on error
|
|
continue
|
|
|
|
# All channels failed
|
|
raise PrinterError(f"Classic Bluetooth connection failed for '{address}' after trying channels {channel}, 1, 2, 3")
|
|
|
|
# BLE connection - keep it simple like original code
|
|
try:
|
|
target = await resolve_ble_target(address)
|
|
async with BleakClient(target) as client:
|
|
pc = PrinterClient(client)
|
|
await pc.start()
|
|
yield pc
|
|
return
|
|
except Exception as exc:
|
|
raise PrinterError(f"BLE connection failed: {exc}") from exc |