""" Fichero / D11s thermal label printer - BLE interface. Protocol reverse-engineered from decompiled Fichero APK (com.lj.fichero). Device class: AiYinNormalDevice (LuckPrinter SDK) 96px wide printhead (12 bytes/row), 203 DPI, prints 1-bit raster images. """ import argparse import asyncio import os import sys from contextlib import asynccontextmanager from bleak import BleakClient, BleakGATTCharacteristic, BleakScanner from PIL import Image, ImageDraw, ImageFont PRINTER_NAME_PREFIXES = ("FICHERO", "D11s_") async def find_printer() -> str: """Scan BLE for a Fichero/D11s printer. Returns the address.""" print("Scanning for printer...") devices = await BleakScanner.discover(timeout=8) for d in devices: if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES): print(f" Found {d.name} at {d.address}") return d.address print(" ERROR: No Fichero/D11s printer found. Is it turned on?") sys.exit(1) # Using the 18f0 service (any of the four BLE UART services work) WRITE_UUID = "00002af1-0000-1000-8000-00805f9b34fb" NOTIFY_UUID = "00002af0-0000-1000-8000-00805f9b34fb" PRINTHEAD_PX = 96 BYTES_PER_ROW = PRINTHEAD_PX // 8 # 12 CHUNK_SIZE = 200 # Paper types for 10 FF 84 00 nn PAPER_GAP = 0x00 PAPER_BLACK_MARK = 0x01 PAPER_CONTINUOUS = 0x02 class PrinterStatus: """Parsed status byte from 10 FF 40.""" def __init__(self, byte: int): self.raw = byte self.printing = bool(byte & 0x01) self.cover_open = bool(byte & 0x02) self.no_paper = bool(byte & 0x04) self.low_battery = bool(byte & 0x08) self.overheated = bool(byte & 0x10 or byte & 0x40) self.charging = bool(byte & 0x20) def __str__(self): flags = [] if self.printing: flags.append("printing") if self.cover_open: flags.append("cover open") if self.no_paper: flags.append("no paper") if self.low_battery: flags.append("low battery") if self.overheated: flags.append("overheated") if self.charging: flags.append("charging") return ", ".join(flags) if flags else "ready" @property def ok(self) -> bool: return not (self.cover_open or self.no_paper or self.overheated) class PrinterClient: def __init__(self, client: BleakClient): self.client = client self._buf = bytearray() self._event = asyncio.Event() def _on_notify(self, _char: BleakGATTCharacteristic, data: bytearray): self._buf.extend(data) self._event.set() async def start(self): await self.client.start_notify(NOTIFY_UUID, self._on_notify) async def send(self, data: bytes, wait: bool = False, timeout: float = 2.0) -> bytes: if wait: self._buf.clear() self._event.clear() await self.client.write_gatt_char(WRITE_UUID, data, response=False) if wait: try: await asyncio.wait_for(self._event.wait(), timeout=timeout) await asyncio.sleep(0.05) except asyncio.TimeoutError: pass return bytes(self._buf) async def send_chunked(self, data: bytes, chunk_size: int = CHUNK_SIZE): for i in range(0, len(data), chunk_size): chunk = data[i : i + chunk_size] await self.client.write_gatt_char(WRITE_UUID, chunk, response=False) await asyncio.sleep(0.02) # --- Info commands (all tested and confirmed on D11s fw 2.4.6) --- async def get_model(self) -> str: r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF0]), wait=True) return r.decode(errors="replace").strip() if r else "?" async def get_firmware(self) -> str: r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF1]), wait=True) return r.decode(errors="replace").strip() if r else "?" async def get_serial(self) -> str: r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF2]), wait=True) return r.decode(errors="replace").strip() if r else "?" async def get_boot_version(self) -> str: r = await self.send(bytes([0x10, 0xFF, 0x20, 0xEF]), wait=True) return r.decode(errors="replace").strip() if r else "?" async def get_battery(self) -> int: r = await self.send(bytes([0x10, 0xFF, 0x50, 0xF1]), wait=True) if r and len(r) >= 2: return r[-1] return -1 async def get_status(self) -> PrinterStatus: r = await self.send(bytes([0x10, 0xFF, 0x40]), wait=True) if r: return PrinterStatus(r[-1]) return PrinterStatus(0xFF) async def get_density(self) -> bytes: r = await self.send(bytes([0x10, 0xFF, 0x11]), wait=True) return r async def get_shutdown_time(self) -> int: """Returns auto-shutdown timeout in minutes.""" r = await self.send(bytes([0x10, 0xFF, 0x13]), wait=True) if r and len(r) >= 2: return (r[0] << 8) | r[1] return -1 async def get_all_info(self) -> dict: """10 FF 70: returns pipe-delimited string with all device info.""" r = await self.send(bytes([0x10, 0xFF, 0x70]), wait=True) if not r: return {} parts = r.decode(errors="replace").split("|") if len(parts) >= 6: return { "bt_name": parts[0], "mac_classic": parts[1], "mac_ble": parts[2], "firmware": parts[3], "serial": parts[4], "battery": f"{parts[5]}%", } return {"raw": r.decode(errors="replace")} # --- Config commands (tested on D11s) --- async def set_density(self, level: int) -> bool: """0=light, 1=medium, 2=thick. Returns True if printer responded OK.""" r = await self.send(bytes([0x10, 0xFF, 0x10, 0x00, level]), wait=True) return r == b"OK" async def set_paper_type(self, paper: int = PAPER_GAP) -> bool: """0=gap/label, 1=black mark, 2=continuous.""" r = await self.send(bytes([0x10, 0xFF, 0x84, paper]), wait=True) return r == b"OK" async def set_shutdown_time(self, minutes: int) -> bool: hi = (minutes >> 8) & 0xFF lo = minutes & 0xFF r = await self.send(bytes([0x10, 0xFF, 0x12, hi, lo]), wait=True) return r == b"OK" async def factory_reset(self) -> bool: r = await self.send(bytes([0x10, 0xFF, 0x04]), wait=True) return r == b"OK" # --- Print control (AiYin-specific, from decompiled APK) --- async def wakeup(self): await self.send(b"\x00" * 12) async def enable(self): """AiYin enable: 10 FF FE 01 (NOT 10 FF F1 03).""" await self.send(bytes([0x10, 0xFF, 0xFE, 0x01])) async def feed_dots(self, dots: int): """Feed paper forward by n dots.""" await self.send(bytes([0x1B, 0x4A, dots & 0xFF])) async def form_feed(self): """Position to next label.""" await self.send(bytes([0x1D, 0x0C])) async def stop_print(self) -> bool: """AiYin stop: 10 FF FE 45. Waits for 0xAA or 'OK'.""" r = await self.send(bytes([0x10, 0xFF, 0xFE, 0x45]), wait=True, timeout=60.0) if r: return r[0] == 0xAA or r.startswith(b"OK") return False async def get_info(self) -> dict: status = await self.get_status() return { "model": await self.get_model(), "firmware": await self.get_firmware(), "boot": await self.get_boot_version(), "serial": await self.get_serial(), "battery": f"{await self.get_battery()}%", "status": str(status), "shutdown": f"{await self.get_shutdown_time()} min", } @asynccontextmanager async def connect(address=None): """Discover printer, connect, and yield a ready PrinterClient.""" addr = address or await find_printer() async with BleakClient(addr) as client: pc = PrinterClient(client) await pc.start() yield pc # --- Image handling --- def prepare_image(img: Image.Image, max_rows: int = 240) -> Image.Image: """Convert any image to 96px wide, 1-bit, black on white.""" img = img.convert("L") w, h = img.size new_h = int(h * (PRINTHEAD_PX / w)) if new_h > max_rows: new_h = max_rows img = img.resize((PRINTHEAD_PX, new_h), Image.LANCZOS) img = img.point(lambda x: 1 if x < 128 else 0, "1") return img def image_to_raster(img: Image.Image) -> bytes: """Pack 1-bit image into raw raster bytes, MSB first.""" return img.tobytes() def text_to_image(text: str, label_height: int = 240) -> Image.Image: """Render text in landscape, then rotate 90 degrees for label printing.""" canvas_w = label_height canvas_h = PRINTHEAD_PX img = Image.new("L", (canvas_w, canvas_h), 255) draw = ImageDraw.Draw(img) font = ImageFont.load_default(size=24) bbox = draw.textbbox((0, 0), text, font=font) tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] x = (canvas_w - tw) // 2 y = (canvas_h - th) // 2 draw.text((x, y), text, fill=0, font=font) img = img.rotate(90, expand=True) return img # --- Print sequence (from decompiled Fichero APK: AiYinNormalDevice) --- async def do_print( pc: PrinterClient, img: Image.Image, density: int = 1, paper: int = PAPER_GAP, copies: int = 1, ): img = prepare_image(img) rows = img.height raster = image_to_raster(img) print(f" Image: {img.width}x{rows}, {len(raster)} bytes, {copies} copies") status = await pc.get_status() if not status.ok: print(f" ERROR: printer not ready ({status})") return False await pc.set_density(density) await asyncio.sleep(0.1) for copy_num in range(copies): if copies > 1: print(f" Copy {copy_num + 1}/{copies}...") # AiYin print sequence (from decompiled APK) await pc.set_paper_type(paper) await asyncio.sleep(0.05) await pc.wakeup() await asyncio.sleep(0.05) await pc.enable() await asyncio.sleep(0.05) # Raster image: GS v 0 m xL xH yL yH yl = rows & 0xFF yh = (rows >> 8) & 0xFF header = bytes([0x1D, 0x76, 0x30, 0x00, BYTES_PER_ROW, 0x00, yl, yh]) await pc.send_chunked(header + raster) await asyncio.sleep(0.5) await pc.form_feed() await asyncio.sleep(0.3) ok = await pc.stop_print() if not ok: print(" WARNING: no OK/0xAA from stop command") return True # --- CLI --- async def cmd_info(args): async with connect(args.address) as pc: info = await pc.get_info() for k, v in info.items(): print(f" {k}: {v}") print() all_info = await pc.get_all_info() for k, v in all_info.items(): print(f" {k}: {v}") async def cmd_status(args): async with connect(args.address) as pc: status = await pc.get_status() print(f" Status: {status}") print(f" Raw: 0x{status.raw:02X} ({status.raw:08b})") print(f" printing={status.printing} cover_open={status.cover_open} " f"no_paper={status.no_paper} low_battery={status.low_battery} " f"overheated={status.overheated} charging={status.charging}") async def cmd_text(args): text = " ".join(args.text) img = text_to_image(text) async with connect(args.address) as pc: print(f'Printing "{text}"...') ok = await do_print(pc, img, args.density, copies=args.copies) print("Done." if ok else "FAILED.") async def cmd_image(args): img = Image.open(args.path) async with connect(args.address) as pc: print(f"Printing {args.path}...") ok = await do_print(pc, img, args.density, copies=args.copies) print("Done." if ok else "FAILED.") async def cmd_set(args): async with connect(args.address) as pc: if args.setting == "density": ok = await pc.set_density(int(args.value)) print(f" Set density={args.value}: {'OK' if ok else 'FAILED'}") elif args.setting == "shutdown": ok = await pc.set_shutdown_time(int(args.value)) print(f" Set shutdown={args.value}min: {'OK' if ok else 'FAILED'}") elif args.setting == "paper": types = {"gap": 0, "black": 1, "continuous": 2} ok = await pc.set_paper_type(types.get(args.value, int(args.value))) print(f" Set paper={args.value}: {'OK' if ok else 'FAILED'}") else: print(f" Unknown setting: {args.setting}") print(" Available: density (0-2), shutdown (minutes), paper (gap/black/continuous)") def main(): parser = argparse.ArgumentParser(description="Fichero D11s Label Printer") parser.add_argument("--address", default=os.environ.get("FICHERO_ADDR"), help="BLE address (skip scanning, or set FICHERO_ADDR)") sub = parser.add_subparsers(dest="command", required=True) p_info = sub.add_parser("info", help="Show device info") p_info.set_defaults(func=cmd_info) p_status = sub.add_parser("status", help="Show detailed status") p_status.set_defaults(func=cmd_status) p_text = sub.add_parser("text", help="Print text label") p_text.add_argument("text", nargs="+", help="Text to print") p_text.add_argument("--density", type=int, default=1, choices=[0, 1, 2], help="Print density: 0=light, 1=medium, 2=thick") p_text.add_argument("--copies", type=int, default=1, help="Number of copies") p_text.set_defaults(func=cmd_text) p_image = sub.add_parser("image", help="Print image file") p_image.add_argument("path", help="Path to image file") p_image.add_argument("--density", type=int, default=1, choices=[0, 1, 2], help="Print density: 0=light, 1=medium, 2=thick") p_image.add_argument("--copies", type=int, default=1, help="Number of copies") p_image.set_defaults(func=cmd_image) p_set = sub.add_parser("set", help="Change printer settings") p_set.add_argument("setting", choices=["density", "shutdown", "paper"], help="Setting to change") p_set.add_argument("value", help="New value") p_set.set_defaults(func=cmd_set) args = parser.parse_args() asyncio.run(args.func(args)) if __name__ == "__main__": main()