Restructure into fichero/ package and fix review issues

Split monolithic printer.py into fichero/{printer,imaging,cli}.py.
Add asyncio.Lock to prevent notification buffer races, replace sys.exit
with PrinterNotFound/PrinterTimeout/PrinterNotReady exceptions, name all
magic sleep values as DELAY_* constants, add image_to_raster validation,
add --font-size and --label-height to text subcommand, add input
validation to set command, add type annotations to public API, fix
misleading paper type comment, remove duplicate PROTOCOL.md section,
update README for new CLI entry point and library usage.
This commit is contained in:
Hamza
2026-02-24 01:07:25 +01:00
parent d8af89d326
commit 1fdbbe20bd
8 changed files with 383 additions and 259 deletions

21
fichero/__init__.py Normal file
View File

@@ -0,0 +1,21 @@
"""Fichero D11s thermal label printer - BLE interface."""
from fichero.printer import (
PrinterClient,
PrinterError,
PrinterNotFound,
PrinterNotReady,
PrinterStatus,
PrinterTimeout,
connect,
)
__all__ = [
"PrinterClient",
"PrinterError",
"PrinterNotFound",
"PrinterNotReady",
"PrinterStatus",
"PrinterTimeout",
"connect",
]

190
fichero/cli.py Normal file
View File

@@ -0,0 +1,190 @@
"""CLI for Fichero D11s thermal label printer."""
import argparse
import asyncio
import os
import sys
from PIL import Image
from fichero.imaging import image_to_raster, prepare_image, text_to_image
from fichero.printer import (
BYTES_PER_ROW,
DELAY_AFTER_DENSITY,
DELAY_AFTER_FEED,
DELAY_COMMAND_GAP,
DELAY_RASTER_SETTLE,
PAPER_GAP,
PrinterClient,
PrinterError,
PrinterNotReady,
connect,
)
async def do_print(
pc: PrinterClient,
img: Image.Image,
density: int = 1,
paper: int = PAPER_GAP,
copies: int = 1,
) -> bool:
img = prepare_image(img)
rows = img.height
raster = image_to_raster(img)
print(f" Image: {img.width}x{rows}, {len(raster)} bytes, {copies} copies")
status = await pc.get_status()
if not status.ok:
raise PrinterNotReady(f"Printer not ready: {status}")
await pc.set_density(density)
await asyncio.sleep(DELAY_AFTER_DENSITY)
for copy_num in range(copies):
if copies > 1:
print(f" Copy {copy_num + 1}/{copies}...")
# AiYin print sequence (from decompiled APK)
await pc.set_paper_type(paper)
await asyncio.sleep(DELAY_COMMAND_GAP)
await pc.wakeup()
await asyncio.sleep(DELAY_COMMAND_GAP)
await pc.enable()
await asyncio.sleep(DELAY_COMMAND_GAP)
# Raster image: GS v 0 m xL xH yL yH <data>
yl = rows & 0xFF
yh = (rows >> 8) & 0xFF
header = bytes([0x1D, 0x76, 0x30, 0x00, BYTES_PER_ROW, 0x00, yl, yh])
await pc.send_chunked(header + raster)
await asyncio.sleep(DELAY_RASTER_SETTLE)
await pc.form_feed()
await asyncio.sleep(DELAY_AFTER_FEED)
ok = await pc.stop_print()
if not ok:
print(" WARNING: no OK/0xAA from stop command")
return True
async def cmd_info(args: argparse.Namespace) -> None:
async with connect(args.address) as pc:
info = await pc.get_info()
for k, v in info.items():
print(f" {k}: {v}")
print()
all_info = await pc.get_all_info()
for k, v in all_info.items():
print(f" {k}: {v}")
async def cmd_status(args: argparse.Namespace) -> None:
async with connect(args.address) as pc:
status = await pc.get_status()
print(f" Status: {status}")
print(f" Raw: 0x{status.raw:02X} ({status.raw:08b})")
print(f" printing={status.printing} cover_open={status.cover_open} "
f"no_paper={status.no_paper} low_battery={status.low_battery} "
f"overheated={status.overheated} charging={status.charging}")
async def cmd_text(args: argparse.Namespace) -> None:
text = " ".join(args.text)
img = text_to_image(text, font_size=args.font_size, label_height=args.label_height)
async with connect(args.address) as pc:
print(f'Printing "{text}"...')
ok = await do_print(pc, img, args.density, copies=args.copies)
print("Done." if ok else "FAILED.")
async def cmd_image(args: argparse.Namespace) -> None:
img = Image.open(args.path)
async with connect(args.address) as pc:
print(f"Printing {args.path}...")
ok = await do_print(pc, img, args.density, copies=args.copies)
print("Done." if ok else "FAILED.")
async def cmd_set(args: argparse.Namespace) -> None:
async with connect(args.address) as pc:
if args.setting == "density":
val = int(args.value)
if not 0 <= val <= 2:
print(" ERROR: density must be 0, 1, or 2")
return
ok = await pc.set_density(val)
print(f" Set density={args.value}: {'OK' if ok else 'FAILED'}")
elif args.setting == "shutdown":
val = int(args.value)
if not 1 <= val <= 480:
print(" ERROR: shutdown must be 1-480 minutes")
return
ok = await pc.set_shutdown_time(val)
print(f" Set shutdown={args.value}min: {'OK' if ok else 'FAILED'}")
elif args.setting == "paper":
types = {"gap": 0, "black": 1, "continuous": 2}
if args.value in types:
val = types[args.value]
else:
try:
val = int(args.value)
except ValueError:
print(" ERROR: paper must be gap, black, continuous, or 0-2")
return
if not 0 <= val <= 2:
print(" ERROR: paper must be gap, black, continuous, or 0-2")
return
ok = await pc.set_paper_type(val)
print(f" Set paper={args.value}: {'OK' if ok else 'FAILED'}")
def main() -> None:
parser = argparse.ArgumentParser(description="Fichero D11s Label Printer")
parser.add_argument("--address", default=os.environ.get("FICHERO_ADDR"),
help="BLE address (skip scanning, or set FICHERO_ADDR)")
sub = parser.add_subparsers(dest="command", required=True)
p_info = sub.add_parser("info", help="Show device info")
p_info.set_defaults(func=cmd_info)
p_status = sub.add_parser("status", help="Show detailed status")
p_status.set_defaults(func=cmd_status)
p_text = sub.add_parser("text", help="Print text label")
p_text.add_argument("text", nargs="+", help="Text to print")
p_text.add_argument("--density", type=int, default=1, choices=[0, 1, 2],
help="Print density: 0=light, 1=medium, 2=thick")
p_text.add_argument("--copies", type=int, default=1, help="Number of copies")
p_text.add_argument("--font-size", type=int, default=24, help="Font size in points")
p_text.add_argument("--label-height", type=int, default=240,
help="Label height in pixels (default: 240)")
p_text.set_defaults(func=cmd_text)
p_image = sub.add_parser("image", help="Print image file")
p_image.add_argument("path", help="Path to image file")
p_image.add_argument("--density", type=int, default=1, choices=[0, 1, 2],
help="Print density: 0=light, 1=medium, 2=thick")
p_image.add_argument("--copies", type=int, default=1, help="Number of copies")
p_image.set_defaults(func=cmd_image)
p_set = sub.add_parser("set", help="Change printer settings")
p_set.add_argument("setting", choices=["density", "shutdown", "paper"],
help="Setting to change")
p_set.add_argument("value", help="New value")
p_set.set_defaults(func=cmd_set)
args = parser.parse_args()
try:
asyncio.run(args.func(args))
except PrinterError as e:
print(f" ERROR: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

50
fichero/imaging.py Normal file
View File

@@ -0,0 +1,50 @@
"""Image processing for Fichero D11s thermal label printer."""
import logging
from PIL import Image, ImageDraw, ImageFont
from fichero.printer import PRINTHEAD_PX
log = logging.getLogger(__name__)
def prepare_image(img: Image.Image, max_rows: int = 240) -> Image.Image:
"""Convert any image to 96px wide, 1-bit, black on white."""
img = img.convert("L")
w, h = img.size
new_h = int(h * (PRINTHEAD_PX / w))
if new_h > max_rows:
log.warning("Image height %dpx exceeds max %dpx, cropping bottom", new_h, max_rows)
new_h = max_rows
img = img.resize((PRINTHEAD_PX, new_h), Image.LANCZOS)
img = img.point(lambda x: 1 if x < 128 else 0, "1")
return img
def image_to_raster(img: Image.Image) -> bytes:
"""Pack 1-bit image into raw raster bytes, MSB first."""
if img.mode != "1":
raise ValueError(f"Expected mode '1', got '{img.mode}'")
if img.width != PRINTHEAD_PX:
raise ValueError(f"Expected width {PRINTHEAD_PX}, got {img.width}")
return img.tobytes()
def text_to_image(text: str, font_size: int = 24, label_height: int = 240) -> Image.Image:
"""Render text in landscape, then rotate 90 degrees for label printing."""
canvas_w = label_height
canvas_h = PRINTHEAD_PX
img = Image.new("L", (canvas_w, canvas_h), 255)
draw = ImageDraw.Draw(img)
font = ImageFont.load_default(size=font_size)
bbox = draw.textbbox((0, 0), text, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
x = (canvas_w - tw) // 2
y = (canvas_h - th) // 2
draw.text((x, y), text, fill=0, font=font)
img = img.rotate(90, expand=True)
return img

275
fichero/printer.py Normal file
View File

@@ -0,0 +1,275 @@
"""
Fichero / D11s thermal label printer - BLE interface.
Protocol reverse-engineered from decompiled Fichero APK (com.lj.fichero).
Device class: AiYinNormalDevice (LuckPrinter SDK)
96px wide printhead (12 bytes/row), 203 DPI, prints 1-bit raster images.
"""
import asyncio
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from bleak import BleakClient, BleakGATTCharacteristic, BleakScanner
# --- BLE identifiers ---
PRINTER_NAME_PREFIXES = ("FICHERO", "D11s_")
# Using the 18f0 service (any of the four BLE UART services work)
WRITE_UUID = "00002af1-0000-1000-8000-00805f9b34fb"
NOTIFY_UUID = "00002af0-0000-1000-8000-00805f9b34fb"
# --- Printhead ---
PRINTHEAD_PX = 96
BYTES_PER_ROW = PRINTHEAD_PX // 8 # 12
CHUNK_SIZE = 200
# --- Paper types for 10 FF 84 nn ---
PAPER_GAP = 0x00
PAPER_BLACK_MARK = 0x01
PAPER_CONTINUOUS = 0x02
# --- Timing (seconds) - empirically tuned against D11s fw 2.4.6 ---
DELAY_AFTER_DENSITY = 0.10 # printer needs time to apply density setting
DELAY_COMMAND_GAP = 0.05 # minimum gap between sequential commands
DELAY_CHUNK_GAP = 0.02 # inter-chunk pacing for BLE throughput
DELAY_RASTER_SETTLE = 0.50 # wait for printhead after raster transfer
DELAY_AFTER_FEED = 0.30 # wait after form feed before stop command
DELAY_NOTIFY_EXTRA = 0.05 # extra wait for trailing BLE notification fragments
# --- Exceptions ---
class PrinterError(Exception):
"""Base exception for printer operations."""
class PrinterNotFound(PrinterError):
"""No Fichero/D11s printer found during BLE scan."""
class PrinterTimeout(PrinterError):
"""Printer did not respond within the expected time."""
class PrinterNotReady(PrinterError):
"""Printer status indicates it cannot print."""
# --- Discovery ---
async def find_printer() -> str:
"""Scan BLE for a Fichero/D11s printer. Returns the address."""
print("Scanning for printer...")
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
print(f" Found {d.name} at {d.address}")
return d.address
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
# --- Status ---
class PrinterStatus:
"""Parsed status byte from 10 FF 40."""
def __init__(self, byte: int):
self.raw = byte
self.printing = bool(byte & 0x01)
self.cover_open = bool(byte & 0x02)
self.no_paper = bool(byte & 0x04)
self.low_battery = bool(byte & 0x08)
self.overheated = bool(byte & 0x10 or byte & 0x40)
self.charging = bool(byte & 0x20)
def __str__(self) -> str:
flags = []
if self.printing:
flags.append("printing")
if self.cover_open:
flags.append("cover open")
if self.no_paper:
flags.append("no paper")
if self.low_battery:
flags.append("low battery")
if self.overheated:
flags.append("overheated")
if self.charging:
flags.append("charging")
return ", ".join(flags) if flags else "ready"
@property
def ok(self) -> bool:
return not (self.cover_open or self.no_paper or self.overheated)
# --- Client ---
class PrinterClient:
def __init__(self, client: BleakClient):
self.client = client
self._buf = bytearray()
self._event = asyncio.Event()
self._lock = asyncio.Lock()
def _on_notify(self, _char: BleakGATTCharacteristic, data: bytearray) -> None:
self._buf.extend(data)
self._event.set()
async def start(self) -> None:
await self.client.start_notify(NOTIFY_UUID, self._on_notify)
async def send(self, data: bytes, wait: bool = False, timeout: float = 2.0) -> bytes:
async with self._lock:
if wait:
self._buf.clear()
self._event.clear()
await self.client.write_gatt_char(WRITE_UUID, data, response=False)
if wait:
try:
await asyncio.wait_for(self._event.wait(), timeout=timeout)
await asyncio.sleep(DELAY_NOTIFY_EXTRA)
except asyncio.TimeoutError:
raise PrinterTimeout(f"No response within {timeout}s")
return bytes(self._buf)
async def send_chunked(self, data: bytes, chunk_size: int = CHUNK_SIZE) -> None:
for i in range(0, len(data), chunk_size):
chunk = data[i : i + chunk_size]
await self.client.write_gatt_char(WRITE_UUID, chunk, response=False)
await asyncio.sleep(DELAY_CHUNK_GAP)
# --- Info commands (all tested and confirmed on D11s fw 2.4.6) ---
async def get_model(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF0]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_firmware(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF1]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_serial(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF2]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_boot_version(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xEF]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_battery(self) -> int:
r = await self.send(bytes([0x10, 0xFF, 0x50, 0xF1]), wait=True)
if r and len(r) >= 2:
return r[-1]
return -1
async def get_status(self) -> PrinterStatus:
r = await self.send(bytes([0x10, 0xFF, 0x40]), wait=True)
if r:
return PrinterStatus(r[-1])
return PrinterStatus(0xFF)
async def get_density(self) -> bytes:
r = await self.send(bytes([0x10, 0xFF, 0x11]), wait=True)
return r
async def get_shutdown_time(self) -> int:
"""Returns auto-shutdown timeout in minutes."""
r = await self.send(bytes([0x10, 0xFF, 0x13]), wait=True)
if r and len(r) >= 2:
return (r[0] << 8) | r[1]
return -1
async def get_all_info(self) -> dict:
"""10 FF 70: returns pipe-delimited string with all device info."""
r = await self.send(bytes([0x10, 0xFF, 0x70]), wait=True)
if not r:
return {}
parts = r.decode(errors="replace").split("|")
if len(parts) >= 6:
return {
"bt_name": parts[0],
"mac_classic": parts[1],
"mac_ble": parts[2],
"firmware": parts[3],
"serial": parts[4],
"battery": f"{parts[5]}%",
}
return {"raw": r.decode(errors="replace")}
# --- Config commands (tested on D11s) ---
async def set_density(self, level: int) -> bool:
"""0=light, 1=medium, 2=thick. Returns True if printer responded OK."""
r = await self.send(bytes([0x10, 0xFF, 0x10, 0x00, level]), wait=True)
return r == b"OK"
async def set_paper_type(self, paper: int = PAPER_GAP) -> bool:
"""0=gap/label, 1=black mark, 2=continuous."""
r = await self.send(bytes([0x10, 0xFF, 0x84, paper]), wait=True)
return r == b"OK"
async def set_shutdown_time(self, minutes: int) -> bool:
hi = (minutes >> 8) & 0xFF
lo = minutes & 0xFF
r = await self.send(bytes([0x10, 0xFF, 0x12, hi, lo]), wait=True)
return r == b"OK"
async def factory_reset(self) -> bool:
r = await self.send(bytes([0x10, 0xFF, 0x04]), wait=True)
return r == b"OK"
# --- Print control (AiYin-specific, from decompiled APK) ---
async def wakeup(self) -> None:
await self.send(b"\x00" * 12)
async def enable(self) -> None:
"""AiYin enable: 10 FF FE 01 (NOT 10 FF F1 03)."""
await self.send(bytes([0x10, 0xFF, 0xFE, 0x01]))
async def feed_dots(self, dots: int) -> None:
"""Feed paper forward by n dots."""
await self.send(bytes([0x1B, 0x4A, dots & 0xFF]))
async def form_feed(self) -> None:
"""Position to next label."""
await self.send(bytes([0x1D, 0x0C]))
async def stop_print(self) -> bool:
"""AiYin stop: 10 FF FE 45. Waits for 0xAA or 'OK'."""
r = await self.send(bytes([0x10, 0xFF, 0xFE, 0x45]), wait=True, timeout=60.0)
if r:
return r[0] == 0xAA or r.startswith(b"OK")
return False
async def get_info(self) -> dict:
status = await self.get_status()
return {
"model": await self.get_model(),
"firmware": await self.get_firmware(),
"boot": await self.get_boot_version(),
"serial": await self.get_serial(),
"battery": f"{await self.get_battery()}%",
"status": str(status),
"shutdown": f"{await self.get_shutdown_time()} min",
}
@asynccontextmanager
async def connect(address: str | None = None) -> AsyncGenerator[PrinterClient, None]:
"""Discover printer, connect, and yield a ready PrinterClient."""
addr = address or await find_printer()
async with BleakClient(addr) as client:
pc = PrinterClient(client)
await pc.start()
yield pc