7 Commits

Author SHA1 Message Date
paul2212
48c40d9f8f 0.1.27 2026-03-16 14:17:49 +01:00
a7fabdae54 Merge pull request '0.1.25' (#6) from 0.1.25 into main
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
Reviewed-on: #6
2026-03-16 12:48:37 +00:00
paul2212
fef3d18d3f 0.1.25 2026-03-16 13:48:12 +01:00
66c3f06b48 Merge pull request 'refactor' (#5) from 0.1.23 into main
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
Reviewed-on: #5
2026-03-16 11:44:45 +00:00
paul2212
7843a38407 refactor 2026-03-16 12:43:26 +01:00
eee58431ab update version
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
2026-03-16 09:26:06 +00:00
40a1f78f55 Merge pull request 'release/0.1.20' (#4) from release/0.1.20 into main
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
Reviewed-on: #4
2026-03-16 09:21:35 +00:00
16 changed files with 242 additions and 1440 deletions

View File

@@ -4,6 +4,49 @@ All notable changes to this project are documented in this file.
The format is based on Keep a Changelog and this project uses Semantic Versioning. The format is based on Keep a Changelog and this project uses Semantic Versioning.
## [0.1.27] - 2026-03-16
### Changed
- **Project Structure**: Moved the `fichero` library into `fichero_printer/` to make the add-on self-contained. This simplifies the build process and removes the need for synchronization scripts.
- Fixed invalid duplicate `version` keys in `pyproject.toml` and `config.yaml`.
## [0.1.26] - 2026-03-16
### Fixed
- **Build Process**: Fixed `too many links` Docker build error by removing the symlink-based approach. Introduced a `sync_addon.sh` script to automate copying the library into the add-on directory, which is required for the Home Assistant build system.
## [0.1.25] - 2026-03-08
### Changed
- **Build Process**: Replaced the manually copied `fichero` directory inside the Home Assistant add-on with a symbolic link. This eliminates code duplication and automates synchronization, simplifying the build process.
## [0.1.24] - 2026-03-08
### Fixed
- **Home Assistant Build**: Reverted the add-on's `Dockerfile` to a vendored code approach to resolve build failures caused by the Home Assistant build system's inability to access files outside the add-on directory. The add-on is now self-contained again.
## [0.1.23] - 2026-03-08
### Changed
- Updated the Home Assistant add-on's `Dockerfile` to install the main library as a package, completing the project structure refactoring.
- Added `python-multipart` as an explicit dependency for the API server.
## [0.1.22] - 2026-03-08
### Changed
- **Refactored Project Structure**: Eliminated duplicated code by converting the project into an installable Python package. The Home Assistant add-on now installs the main library as a dependency instead of using a vendored copy, improving maintainability and preventing sync issues.
## [0.1.21] - 2026-03-08
### Fixed
- Synchronized the Home Assistant add-on's source code (`fichero_printer/fichero/`) with the main library to fix stale code and version mismatch issues.
## [0.1.20] - 2026-03-08 ## [0.1.20] - 2026-03-08
### Changed ### Changed

View File

@@ -1,25 +0,0 @@
"""Fichero D11s thermal label printer - BLE + Classic Bluetooth interface."""
from fichero.printer import (
RFCOMM_CHANNEL,
PrinterClient,
PrinterError,
PrinterNotFound,
PrinterNotReady,
PrinterStatus,
PrinterTimeout,
RFCOMMClient,
connect,
)
__all__ = [
"RFCOMM_CHANNEL",
"PrinterClient",
"PrinterError",
"PrinterNotFound",
"PrinterNotReady",
"PrinterStatus",
"PrinterTimeout",
"RFCOMMClient",
"connect",
]

View File

@@ -75,7 +75,7 @@ async def lifespan(app: FastAPI): # noqa: ARG001
app = FastAPI( app = FastAPI(
title="Fichero Printer API", title="Fichero Printer API",
description="REST API for the Fichero D11s (AiYin) thermal label printer.", description="REST API for the Fichero D11s (AiYin) thermal label printer.",
version="0.1.20", version="0.1.27",
lifespan=lifespan, lifespan=lifespan,
docs_url=None, docs_url=None,
redoc_url=None, redoc_url=None,

View File

@@ -1,251 +0,0 @@
"""CLI for Fichero D11s thermal label printer."""
import argparse
import asyncio
import os
import sys
from PIL import Image
from fichero.imaging import image_to_raster, prepare_image, text_to_image
from fichero.printer import (
BYTES_PER_ROW,
DELAY_AFTER_DENSITY,
DELAY_AFTER_FEED,
DELAY_COMMAND_GAP,
DELAY_RASTER_SETTLE,
PAPER_GAP,
PrinterClient,
PrinterError,
PrinterNotReady,
connect,
)
DOTS_PER_MM = 8 # 203 DPI
def _resolve_label_height(args: argparse.Namespace) -> int:
"""Return label height in pixels from --label-length (mm) or --label-height (px)."""
if args.label_length is not None:
return args.label_length * DOTS_PER_MM
return args.label_height
async def do_print(
pc: PrinterClient,
img: Image.Image,
density: int = 1,
paper: int = PAPER_GAP,
copies: int = 1,
dither: bool = True,
max_rows: int = 240,
) -> bool:
img = prepare_image(img, max_rows=max_rows, dither=dither)
rows = img.height
raster = image_to_raster(img)
print(f" Image: {img.width}x{rows}, {len(raster)} bytes, {copies} copies")
await pc.set_density(density)
await asyncio.sleep(DELAY_AFTER_DENSITY)
for copy_num in range(copies):
if copies > 1:
print(f" Copy {copy_num + 1}/{copies}...")
# Check status before each copy (matches decompiled app behaviour)
status = await pc.get_status()
if not status.ok:
raise PrinterNotReady(f"Printer not ready: {status}")
# AiYin print sequence (from decompiled APK)
await pc.set_paper_type(paper)
await asyncio.sleep(DELAY_COMMAND_GAP)
await pc.wakeup()
await asyncio.sleep(DELAY_COMMAND_GAP)
await pc.enable()
await asyncio.sleep(DELAY_COMMAND_GAP)
# Raster image: GS v 0 m xL xH yL yH <data>
yl = rows & 0xFF
yh = (rows >> 8) & 0xFF
header = bytes([0x1D, 0x76, 0x30, 0x00, BYTES_PER_ROW, 0x00, yl, yh])
await pc.send_chunked(header + raster)
await asyncio.sleep(DELAY_RASTER_SETTLE)
await pc.form_feed()
await asyncio.sleep(DELAY_AFTER_FEED)
ok = await pc.stop_print()
if not ok:
print(" WARNING: no OK/0xAA from stop command")
return True
async def cmd_info(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
info = await pc.get_info()
for k, v in info.items():
print(f" {k}: {v}")
print()
all_info = await pc.get_all_info()
for k, v in all_info.items():
print(f" {k}: {v}")
async def cmd_status(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
status = await pc.get_status()
print(f" Status: {status}")
print(f" Raw: 0x{status.raw:02X} ({status.raw:08b})")
print(f" printing={status.printing} cover_open={status.cover_open} "
f"no_paper={status.no_paper} low_battery={status.low_battery} "
f"overheated={status.overheated} charging={status.charging}")
async def cmd_text(args: argparse.Namespace) -> None:
text = " ".join(args.text)
label_h = _resolve_label_height(args)
img = text_to_image(text, font_size=args.font_size, label_height=label_h)
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
print(f'Printing "{text}"...')
ok = await do_print(pc, img, args.density, paper=args.paper,
copies=args.copies, dither=False, max_rows=label_h)
print("Done." if ok else "FAILED.")
async def cmd_image(args: argparse.Namespace) -> None:
img = Image.open(args.path)
label_h = _resolve_label_height(args)
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
print(f"Printing {args.path}...")
ok = await do_print(pc, img, args.density, paper=args.paper,
copies=args.copies, dither=not args.no_dither,
max_rows=label_h)
print("Done." if ok else "FAILED.")
async def cmd_set(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
if args.setting == "density":
val = int(args.value)
if not 0 <= val <= 2:
print(" ERROR: density must be 0, 1, or 2")
return
ok = await pc.set_density(val)
print(f" Set density={args.value}: {'OK' if ok else 'FAILED'}")
elif args.setting == "shutdown":
val = int(args.value)
if not 1 <= val <= 480:
print(" ERROR: shutdown must be 1-480 minutes")
return
ok = await pc.set_shutdown_time(val)
print(f" Set shutdown={args.value}min: {'OK' if ok else 'FAILED'}")
elif args.setting == "paper":
types = {"gap": 0, "black": 1, "continuous": 2}
if args.value in types:
val = types[args.value]
else:
try:
val = int(args.value)
except ValueError:
print(" ERROR: paper must be gap, black, continuous, or 0-2")
return
if not 0 <= val <= 2:
print(" ERROR: paper must be gap, black, continuous, or 0-2")
return
ok = await pc.set_paper_type(val)
print(f" Set paper={args.value}: {'OK' if ok else 'FAILED'}")
def _add_paper_arg(parser: argparse.ArgumentParser) -> None:
"""Add --paper argument to a subparser."""
parser.add_argument(
"--paper", type=str, default="gap",
help="Paper type: gap (default), black, continuous",
)
def _parse_paper(value: str) -> int:
"""Convert paper string/int to protocol value."""
types = {"gap": 0, "black": 1, "continuous": 2}
if value in types:
return types[value]
try:
val = int(value)
if 0 <= val <= 2:
return val
except ValueError:
pass
print(f" WARNING: unknown paper type '{value}', using gap")
return 0
def main() -> None:
parser = argparse.ArgumentParser(description="Fichero D11s Label Printer")
parser.add_argument("--address", default=os.environ.get("FICHERO_ADDR"),
help="BLE address (skip scanning, or set FICHERO_ADDR)")
parser.add_argument("--classic", action="store_true",
default=os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic",
help="Use Classic Bluetooth (RFCOMM) instead of BLE (Linux only, "
"or set FICHERO_TRANSPORT=classic)")
parser.add_argument("--channel", type=int, default=1,
help="RFCOMM channel (default: 1, only used with --classic)")
sub = parser.add_subparsers(dest="command", required=True)
p_info = sub.add_parser("info", help="Show device info")
p_info.set_defaults(func=cmd_info)
p_status = sub.add_parser("status", help="Show detailed status")
p_status.set_defaults(func=cmd_status)
p_text = sub.add_parser("text", help="Print text label")
p_text.add_argument("text", nargs="+", help="Text to print")
p_text.add_argument("--density", type=int, default=2, choices=[0, 1, 2],
help="Print density: 0=light, 1=medium, 2=thick")
p_text.add_argument("--copies", type=int, default=1, help="Number of copies")
p_text.add_argument("--font-size", type=int, default=30, help="Font size in points")
p_text.add_argument("--label-length", type=int, default=None,
help="Label length in mm (default: 30mm)")
p_text.add_argument("--label-height", type=int, default=240,
help="Label height in pixels (default: 240, prefer --label-length)")
_add_paper_arg(p_text)
p_text.set_defaults(func=cmd_text)
p_image = sub.add_parser("image", help="Print image file")
p_image.add_argument("path", help="Path to image file")
p_image.add_argument("--density", type=int, default=2, choices=[0, 1, 2],
help="Print density: 0=light, 1=medium, 2=thick")
p_image.add_argument("--copies", type=int, default=1, help="Number of copies")
p_image.add_argument("--no-dither", action="store_true",
help="Disable Floyd-Steinberg dithering (use simple threshold)")
p_image.add_argument("--label-length", type=int, default=None,
help="Label length in mm (default: 30mm)")
p_image.add_argument("--label-height", type=int, default=240,
help="Max image height in pixels (default: 240, prefer --label-length)")
_add_paper_arg(p_image)
p_image.set_defaults(func=cmd_image)
p_set = sub.add_parser("set", help="Change printer settings")
p_set.add_argument("setting", choices=["density", "shutdown", "paper"],
help="Setting to change")
p_set.add_argument("value", help="New value")
p_set.set_defaults(func=cmd_set)
args = parser.parse_args()
# Resolve --paper string to int for print commands
if hasattr(args, "paper") and isinstance(args.paper, str):
args.paper = _parse_paper(args.paper)
try:
asyncio.run(args.func(args))
except PrinterError as e:
print(f" ERROR: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,97 +0,0 @@
"""Image processing for Fichero D11s thermal label printer."""
import logging
import numpy as np
from PIL import Image, ImageDraw, ImageFont, ImageOps
from fichero.printer import PRINTHEAD_PX
log = logging.getLogger(__name__)
def floyd_steinberg_dither(img: Image.Image) -> Image.Image:
"""Floyd-Steinberg error-diffusion dithering to 1-bit.
Same algorithm as PrinterImageProcessor.ditherFloydSteinberg() in the
decompiled Fichero APK: distributes quantisation error to neighbouring
pixels with weights 7/16, 3/16, 5/16, 1/16.
"""
arr = np.array(img, dtype=np.float32)
h, w = arr.shape
for y in range(h):
for x in range(w):
old = arr[y, x]
new = 0.0 if old < 128 else 255.0
arr[y, x] = new
err = old - new
if x + 1 < w:
arr[y, x + 1] += err * 7 / 16
if y + 1 < h:
if x - 1 >= 0:
arr[y + 1, x - 1] += err * 3 / 16
arr[y + 1, x] += err * 5 / 16
if x + 1 < w:
arr[y + 1, x + 1] += err * 1 / 16
arr = np.clip(arr, 0, 255).astype(np.uint8)
return Image.fromarray(arr, mode="L")
def prepare_image(
img: Image.Image, max_rows: int = 240, dither: bool = True
) -> Image.Image:
"""Convert any image to 96px wide, 1-bit, black on white.
When *dither* is True (default), uses Floyd-Steinberg error diffusion
for better quality on photos and gradients. Set False for crisp text.
"""
img = img.convert("L")
w, h = img.size
new_h = int(h * (PRINTHEAD_PX / w))
img = img.resize((PRINTHEAD_PX, new_h), Image.LANCZOS)
if new_h > max_rows:
log.warning("Image height %dpx exceeds max %dpx, cropping bottom", new_h, max_rows)
img = img.crop((0, 0, PRINTHEAD_PX, max_rows))
img = ImageOps.autocontrast(img, cutoff=1)
if dither:
img = floyd_steinberg_dither(img)
# Pack to 1-bit. PIL mode "1" tobytes() uses 0-bit=black, 1-bit=white,
# but the printer wants 1-bit=black. Mapping dark->1 via point() inverts
# the PIL convention so the final packed bits match what the printer needs.
img = img.point(lambda x: 1 if x < 128 else 0, "1")
return img
def image_to_raster(img: Image.Image) -> bytes:
"""Pack 1-bit image into raw raster bytes, MSB first."""
if img.mode != "1":
raise ValueError(f"Expected mode '1', got '{img.mode}'")
if img.width != PRINTHEAD_PX:
raise ValueError(f"Expected width {PRINTHEAD_PX}, got {img.width}")
return img.tobytes()
def text_to_image(text: str, font_size: int = 30, label_height: int = 240) -> Image.Image:
"""Render crisp 1-bit text, rotated 90 degrees for label printing."""
canvas_w = label_height
canvas_h = PRINTHEAD_PX
img = Image.new("L", (canvas_w, canvas_h), 255)
draw = ImageDraw.Draw(img)
draw.fontmode = "1" # disable antialiasing - pure 1-bit glyph rendering
font = ImageFont.load_default(size=font_size)
bbox = draw.textbbox((0, 0), text, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
x = (canvas_w - tw) // 2 - bbox[0]
y = (canvas_h - th) // 2 - bbox[1]
draw.text((x, y), text, fill=0, font=font)
img = img.rotate(90, expand=True)
return img

View File

@@ -1,514 +0,0 @@
"""
Fichero / D11s thermal label printer - BLE + Classic Bluetooth interface.
Protocol reverse-engineered from decompiled Fichero APK (com.lj.fichero).
Device class: AiYinNormalDevice (LuckPrinter SDK)
96px wide printhead (12 bytes/row), 203 DPI, prints 1-bit raster images.
"""
import asyncio
import sys
import errno
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from bleak import BleakClient, BleakGATTCharacteristic, BleakScanner
from bleak.exc import BleakDBusError, BleakError
# --- RFCOMM (Classic Bluetooth) support - Linux + Windows (Python 3.9+) ---
_RFCOMM_AVAILABLE = False
if sys.platform in ("linux", "win32"):
import socket as _socket
_RFCOMM_AVAILABLE = hasattr(_socket, "AF_BLUETOOTH")
RFCOMM_CHANNEL = 1
# --- BLE identifiers ---
PRINTER_NAME_PREFIXES = ("FICHERO", "D11s_")
# Using the 18f0 service (any of the four BLE UART services work)
WRITE_UUID = "00002af1-0000-1000-8000-00805f9b34fb"
NOTIFY_UUID = "00002af0-0000-1000-8000-00805f9b34fb"
# --- Printhead ---
PRINTHEAD_PX = 96
BYTES_PER_ROW = PRINTHEAD_PX // 8 # 12
CHUNK_SIZE_BLE = 200 # BLE MTU-limited
CHUNK_SIZE_CLASSIC = 16384 # from decompiled app (C1703d.java), stream-based
# --- Paper types for 10 FF 84 nn ---
PAPER_GAP = 0x00
PAPER_BLACK_MARK = 0x01
PAPER_CONTINUOUS = 0x02
# --- Timing (seconds) - empirically tuned against D11s fw 2.4.6 ---
DELAY_AFTER_DENSITY = 0.10 # printer needs time to apply density setting
DELAY_COMMAND_GAP = 0.05 # minimum gap between sequential commands
DELAY_CHUNK_GAP = 0.02 # inter-chunk pacing for BLE throughput
DELAY_RASTER_SETTLE = 0.50 # wait for printhead after raster transfer
DELAY_AFTER_FEED = 0.30 # wait after form feed before stop command
DELAY_NOTIFY_EXTRA = 0.05 # extra wait for trailing BLE notification fragments
BLE_CONNECT_RETRIES = 3 # retry transient BLE connect failures
BLE_CONNECT_BACKOFF = 0.7 # base backoff in seconds (linear: n * base)
# --- Exceptions ---
class PrinterError(Exception):
"""Base exception for printer operations."""
class PrinterNotFound(PrinterError):
"""No Fichero/D11s printer found during BLE scan."""
class PrinterTimeout(PrinterError):
"""Printer did not respond within the expected time."""
class PrinterNotReady(PrinterError):
"""Printer status indicates it cannot print."""
# --- Discovery ---
async def find_printer() -> str:
"""Scan BLE for a Fichero/D11s printer. Returns the address."""
print("Scanning for printer...")
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
print(f" Found {d.name} at {d.address}")
return d.address
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
async def resolve_ble_target(address: str | None = None):
"""Resolve a BLE target as Bleak device object when possible.
Passing a discovered device object to BleakClient helps BlueZ keep the
correct LE context for dual-mode environments.
"""
if address:
device = await BleakScanner.find_device_by_address(address, timeout=8.0)
if device is not None:
return device
# Fallback to active scan/match before giving up; do not fall back to
# raw address because BlueZ may then attempt BR/EDR and fail with
# br-connection-not-supported.
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.address and d.address.lower() == address.lower():
return d
raise PrinterNotFound(
f"BLE device {address} not found during scan. "
"Ensure printer is on, awake, and in range."
)
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
print(f" Found {d.name} at {d.address}")
return d
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
# --- Status ---
class PrinterStatus:
"""Parsed status byte from 10 FF 40."""
def __init__(self, byte: int):
self.raw = byte
self.printing = bool(byte & 0x01)
self.cover_open = bool(byte & 0x02)
self.no_paper = bool(byte & 0x04)
self.low_battery = bool(byte & 0x08)
self.overheated = bool(byte & 0x10 or byte & 0x40)
self.charging = bool(byte & 0x20)
def __str__(self) -> str:
flags = []
if self.printing:
flags.append("printing")
if self.cover_open:
flags.append("cover open")
if self.no_paper:
flags.append("no paper")
if self.low_battery:
flags.append("low battery")
if self.overheated:
flags.append("overheated")
if self.charging:
flags.append("charging")
return ", ".join(flags) if flags else "ready"
@property
def ok(self) -> bool:
return not (self.cover_open or self.no_paper or self.overheated)
# --- RFCOMM client (duck-types the BleakClient interface) ---
class RFCOMMClient:
"""Classic Bluetooth (RFCOMM) transport. Linux + Windows (Python 3.9+).
Implements the same async context manager + write_gatt_char/start_notify
interface that PrinterClient expects from BleakClient. Zero dependencies
beyond stdlib.
"""
is_classic = True # transport marker for PrinterClient chunk sizing
def __init__(self, address: str, channel: int = RFCOMM_CHANNEL):
self._address = address
self._channel = channel
self._sock: "_socket.socket | None" = None
self._reader_task: asyncio.Task | None = None
async def __aenter__(self) -> "RFCOMMClient":
if not _RFCOMM_AVAILABLE:
raise PrinterError(
"RFCOMM transport requires socket.AF_BLUETOOTH "
"(Linux with BlueZ, or Windows with Python 3.9+). "
"Not available on this platform."
)
import socket as _socket
sock = _socket.socket(
_socket.AF_BLUETOOTH, _socket.SOCK_STREAM, _socket.BTPROTO_RFCOMM
)
loop = asyncio.get_running_loop()
try:
# uvloop's sock_connect path goes through getaddrinfo and doesn't
# support AF_BLUETOOTH addresses reliably. Use direct socket connect
# in a thread instead.
sock.settimeout(10.0)
await loop.run_in_executor(
None,
sock.connect,
(self._address, self._channel),
)
sock.setblocking(False)
except asyncio.TimeoutError as exc:
sock.close()
raise PrinterTimeout(
f"Classic Bluetooth connection timed out to {self._address} (channel {self._channel})."
) from exc
except OSError as exc:
sock.close()
raise PrinterError(
f"Classic Bluetooth connection failed for '{self._address}' (channel {self._channel}): {exc}"
) from exc
except Exception:
sock.close()
raise
self._sock = sock
return self
async def __aexit__(self, *exc) -> None:
if self._reader_task is not None:
self._reader_task.cancel()
try:
await self._reader_task
except asyncio.CancelledError:
pass
self._reader_task = None
if self._sock is not None:
self._sock.close()
self._sock = None
async def write_gatt_char(self, _uuid: str, data: bytes, response: bool = False) -> None:
loop = asyncio.get_running_loop()
await loop.sock_sendall(self._sock, data)
async def start_notify(self, _uuid: str, callback) -> None:
self._reader_task = asyncio.create_task(self._reader_loop(callback))
async def _reader_loop(self, callback) -> None:
loop = asyncio.get_running_loop()
while True:
try:
data = await loop.sock_recv(self._sock, 1024)
except (OSError, asyncio.CancelledError):
return
if not data:
return
callback(None, bytearray(data))
# --- Client ---
class PrinterClient:
def __init__(self, client: BleakClient):
self.client = client
self._buf = bytearray()
self._event = asyncio.Event()
self._lock = asyncio.Lock()
self._is_classic = getattr(client, "is_classic", False)
def _on_notify(self, _char: BleakGATTCharacteristic, data: bytearray) -> None:
self._buf.extend(data)
self._event.set()
async def start(self) -> None:
await self.client.start_notify(NOTIFY_UUID, self._on_notify)
async def send(self, data: bytes, wait: bool = False, timeout: float = 2.0) -> bytes:
async with self._lock:
if wait:
self._buf.clear()
self._event.clear()
await self.client.write_gatt_char(WRITE_UUID, data, response=False)
if wait:
try:
await asyncio.wait_for(self._event.wait(), timeout=timeout)
await asyncio.sleep(DELAY_NOTIFY_EXTRA)
except asyncio.TimeoutError:
raise PrinterTimeout(f"No response within {timeout}s")
return bytes(self._buf)
async def send_chunked(self, data: bytes, chunk_size: int | None = None) -> None:
if chunk_size is None:
chunk_size = CHUNK_SIZE_CLASSIC if self._is_classic else CHUNK_SIZE_BLE
delay = 0 if self._is_classic else DELAY_CHUNK_GAP
async with self._lock:
for i in range(0, len(data), chunk_size):
chunk = data[i : i + chunk_size]
await self.client.write_gatt_char(WRITE_UUID, chunk, response=False)
if delay:
await asyncio.sleep(delay)
# --- Info commands (all tested and confirmed on D11s fw 2.4.6) ---
async def get_model(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF0]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_firmware(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF1]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_serial(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF2]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_boot_version(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xEF]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_battery(self) -> int:
r = await self.send(bytes([0x10, 0xFF, 0x50, 0xF1]), wait=True)
if r and len(r) >= 2:
return r[-1]
return -1
async def get_status(self) -> PrinterStatus:
r = await self.send(bytes([0x10, 0xFF, 0x40]), wait=True)
if r:
return PrinterStatus(r[-1])
return PrinterStatus(0xFF)
async def get_density(self) -> bytes:
r = await self.send(bytes([0x10, 0xFF, 0x11]), wait=True)
return r
async def get_shutdown_time(self) -> int:
"""Returns auto-shutdown timeout in minutes."""
r = await self.send(bytes([0x10, 0xFF, 0x13]), wait=True)
if r and len(r) >= 2:
return (r[0] << 8) | r[1]
return -1
async def get_all_info(self) -> dict:
"""10 FF 70: returns pipe-delimited string with all device info."""
r = await self.send(bytes([0x10, 0xFF, 0x70]), wait=True)
if not r:
return {}
parts = r.decode(errors="replace").split("|")
if len(parts) >= 6:
return {
"bt_name": parts[0],
"mac_classic": parts[1],
"mac_ble": parts[2],
"firmware": parts[3],
"serial": parts[4],
"battery": f"{parts[5]}%",
}
return {"raw": r.decode(errors="replace")}
# --- Config commands (tested on D11s) ---
async def set_density(self, level: int) -> bool:
"""0=light, 1=medium, 2=thick. Returns True if printer responded OK."""
r = await self.send(bytes([0x10, 0xFF, 0x10, 0x00, level]), wait=True)
return r == b"OK"
async def set_paper_type(self, paper: int = PAPER_GAP) -> bool:
"""0=gap/label, 1=black mark, 2=continuous."""
r = await self.send(bytes([0x10, 0xFF, 0x84, paper]), wait=True)
return r == b"OK"
async def set_shutdown_time(self, minutes: int) -> bool:
hi = (minutes >> 8) & 0xFF
lo = minutes & 0xFF
r = await self.send(bytes([0x10, 0xFF, 0x12, hi, lo]), wait=True)
return r == b"OK"
async def factory_reset(self) -> bool:
r = await self.send(bytes([0x10, 0xFF, 0x04]), wait=True)
return r == b"OK"
# --- Print control (AiYin-specific, from decompiled APK) ---
async def wakeup(self) -> None:
await self.send(b"\x00" * 12)
async def enable(self) -> None:
"""AiYin enable: 10 FF FE 01 (NOT 10 FF F1 03)."""
await self.send(bytes([0x10, 0xFF, 0xFE, 0x01]))
async def feed_dots(self, dots: int) -> None:
"""Feed paper forward by n dots."""
await self.send(bytes([0x1B, 0x4A, dots & 0xFF]))
async def form_feed(self) -> None:
"""Position to next label."""
await self.send(bytes([0x1D, 0x0C]))
async def stop_print(self) -> bool:
"""AiYin stop: 10 FF FE 45. Waits for 0xAA or 'OK'."""
r = await self.send(bytes([0x10, 0xFF, 0xFE, 0x45]), wait=True, timeout=60.0)
if r:
return r[0] == 0xAA or r.startswith(b"OK")
return False
async def get_info(self) -> dict:
status = await self.get_status()
return {
"model": await self.get_model(),
"firmware": await self.get_firmware(),
"boot": await self.get_boot_version(),
"serial": await self.get_serial(),
"battery": f"{await self.get_battery()}%",
"status": str(status),
"shutdown": f"{await self.get_shutdown_time()} min",
}
@asynccontextmanager
async def connect(
address: str | None = None,
classic: bool = False,
channel: int = RFCOMM_CHANNEL,
) -> AsyncGenerator[PrinterClient, None]:
"""Discover printer, connect, and yield a ready PrinterClient."""
if classic:
if not address:
raise PrinterError("--address is required for Classic Bluetooth (no scanning)")
# D11s variants are commonly exposed on channel 1 or 3.
candidates = [channel, 1, 2, 3]
channels = [ch for i, ch in enumerate(candidates) if ch > 0 and ch not in candidates[:i]]
last_exc: Exception | None = None
for ch in channels:
try:
async with RFCOMMClient(address, ch) as client:
pc = PrinterClient(client)
await pc.start()
yield pc
return
except (PrinterError, PrinterTimeout) as exc:
# On Linux, a stale BlueZ device state can cause RFCOMM connect()
# to fail with [Errno 12] Out of memory. This is a known quirk.
# We treat this specific error as a signal to fall back to BLE.
if isinstance(exc.__cause__, OSError) and exc.__cause__.errno == errno.ENOMEM:
print(
"Classic Bluetooth connection failed with [Errno 12] Out of memory. "
"Falling back to BLE connection."
)
classic = False # Modify flag to trigger BLE path below
last_exc = exc
break
last_exc = exc
# If the 'classic' flag is still true, it means the loop completed without
# hitting the ENOMEM fallback case, so all classic attempts failed.
if classic:
if last_exc is not None:
raise PrinterError(
f"Classic Bluetooth connection failed for '{address}'. "
f"Tried channels: {channels}. Last error: {last_exc}"
) from last_exc
raise PrinterError(f"Classic Bluetooth connection failed for '{address}'.")
# If classic=False initially, or if it was set to False for the ENOMEM fallback:
if not classic:
target = await resolve_ble_target(address)
def _is_retryable_ble_error(exc: Exception) -> bool:
msg = str(exc).lower()
return any(
token in msg
for token in (
"timeout",
"timed out",
"br-connection-timeout",
"failed to discover services",
"device disconnected",
)
)
last_exc: Exception | None = None
forced_rescan_done = False
for attempt in range(1, BLE_CONNECT_RETRIES + 1):
try:
async with BleakClient(target) as client:
pc = PrinterClient(client)
await pc.start()
yield pc
return
except asyncio.TimeoutError as exc:
last_exc = exc
if attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(f"BLE connection timed out: {exc}") from exc
except BleakDBusError as exc:
msg = str(exc).lower()
if "br-connection-not-supported" in msg:
last_exc = exc
if not forced_rescan_done:
forced_rescan_done = True
target = await resolve_ble_target(None)
if attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(
"BLE connection failed (br-connection-not-supported) after LE rescan. "
"Try Classic Bluetooth with classic=true and channel=1."
) from exc
last_exc = exc
if _is_retryable_ble_error(exc) and attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(f"BLE connection failed: {exc}") from exc
except BleakError as exc:
last_exc = exc
if _is_retryable_ble_error(exc) and attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(f"BLE error: {exc}") from exc
if last_exc is not None:
raise PrinterError(
f"BLE connection failed after {BLE_CONNECT_RETRIES} attempts: {last_exc}"
) from last_exc
raise PrinterError("BLE connection failed for unknown reason.")

View File

@@ -1,5 +1,24 @@
# Changelog # Changelog
## 0.1.27
- The `fichero` library is now part of the add-on directory, simplifying the build process and removing the need for synchronization scripts.
## 0.1.24
- Fixed Docker build failures by reverting to a vendored code approach. The add-on now expects the `fichero` library to be present within its directory during the build.
## 0.1.23
- Updated `Dockerfile` to install the main library via `pip` instead of copying source files, completing the refactoring to eliminate duplicated code.
## 0.1.22
- Refactored build process to install the main `fichero-printer` library as a package instead of using duplicated source files. This resolves issues with stale code.
## 0.1.21
- Fixed stale source code issue by synchronizing the add-on's internal `fichero` package with the latest library version.
## 0.1.20 ## 0.1.20
- Refactored the embedded web UI to be loaded from an external `index.html` file. - Refactored the embedded web UI to be loaded from an external `index.html` file.

View File

@@ -1,28 +1,32 @@
ARG BUILD_FROM ARG BUILD_FROM
FROM $BUILD_FROM FROM $BUILD_FROM
# Only dbus-dev needed to talk to the HOST BlueZ via D-Bus (host_dbus: true). # Install system dependencies.
# build-base is for compiling Python packages (numpy, pillow).
# dbus-dev is for Bleak to communicate with the host's BlueZ.
# Do NOT install bluez here - we use the host BlueZ, not our own. # Do NOT install bluez here - we use the host BlueZ, not our own.
RUN apk add --no-cache \ RUN apk add --no-cache \
bash \ bash \
python3 \ python3 \
py3-pip \ py3-pip \
py3-numpy \ dbus-dev \
py3-pillow \ build-base
dbus-dev
# Pure-Python packages (bleak uses dbus-fast internally, no C compiler needed) # Install Python dependencies from pip.
# We cannot use `pip install .` from pyproject.toml as it's outside the build context.
RUN pip3 install --no-cache-dir --break-system-packages \ RUN pip3 install --no-cache-dir --break-system-packages \
"bleak>=0.21" \ "bleak" \
"fastapi>=0.111" \ "numpy" \
"uvicorn[standard]>=0.29" \ "Pillow" \
"fastapi" \
"uvicorn[standard]" \
"python-multipart>=0.0.9" "python-multipart>=0.0.9"
# Copy the fichero Python package into the container # Copy the application code into the container.
WORKDIR /app WORKDIR /app
COPY fichero/ /app/fichero/ COPY fichero/ /app/fichero/
# Make the package importable without installation # Make the 'fichero' package importable.
ENV PYTHONPATH=/app ENV PYTHONPATH=/app
# Copy startup script and normalise line endings (Windows CRLF -> LF) # Copy startup script and normalise line endings (Windows CRLF -> LF)

View File

@@ -1,5 +1,5 @@
name: "Fichero Printer" name: "Fichero Printer"
version: "0.1.15" version: "0.1.27"
slug: "fichero_printer" slug: "fichero_printer"
description: "REST API for the Fichero D11s (AiYin) thermal label printer over Bluetooth" description: "REST API for the Fichero D11s (AiYin) thermal label printer over Bluetooth"
url: "https://git.leuschner.dev/Tobias/Fichero" url: "https://git.leuschner.dev/Tobias/Fichero"

View File

@@ -5,6 +5,7 @@ Start with:
or: or:
python -m fichero.api python -m fichero.api
Endpoints: Endpoints:
GET /status Printer status GET /status Printer status
GET /info Printer info (model, firmware, battery, …) GET /info Printer info (model, firmware, battery, …)
@@ -17,8 +18,10 @@ from __future__ import annotations
import argparse import argparse
import asyncio import asyncio
import io import io
import re
import os import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from pathlib import Path
from typing import Annotated from typing import Annotated
from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi import FastAPI, File, Form, HTTPException, UploadFile
@@ -35,7 +38,6 @@ from fichero.printer import (
PrinterNotFound, PrinterNotFound,
PrinterTimeout, PrinterTimeout,
connect, connect,
find_printer,
) )
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -73,7 +75,7 @@ async def lifespan(app: FastAPI): # noqa: ARG001
app = FastAPI( app = FastAPI(
title="Fichero Printer API", title="Fichero Printer API",
description="REST API for the Fichero D11s (AiYin) thermal label printer.", description="REST API for the Fichero D11s (AiYin) thermal label printer.",
version="0.1.13", version="0.1.26",
lifespan=lifespan, lifespan=lifespan,
docs_url=None, docs_url=None,
redoc_url=None, redoc_url=None,
@@ -95,501 +97,20 @@ def _address(address: str | None) -> str | None:
def _ui_html() -> str: def _ui_html() -> str:
default_address = _DEFAULT_ADDRESS or "" default_address = _DEFAULT_ADDRESS or ""
default_transport = "classic" if _DEFAULT_CLASSIC else "ble" default_transport = "classic" if _DEFAULT_CLASSIC else "ble"
return f"""<!doctype html>
<html lang="en" data-theme="dark">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Fichero Printer</title>
<style>
*, *::before, *::after {{ box-sizing: border-box; margin: 0; padding: 0; }}
:root {{ try:
--s0: #161819; template_path = Path(__file__).parent / "index.html"
--s1: #1c1e20; template = template_path.read_text(encoding="utf-8")
--s2: #232628; except FileNotFoundError:
--s3: #2a2d30; return "<h1>Error: index.html not found</h1>"
--ink: #e4e7eb;
--muted: #949ba4;
--dim: #5d6670;
--brand: #07addf;
--brand-h: #0699c7;
--brand-glow: rgba(7,173,223,.18);
--ok: #6bb88a;
--warn: #d4a24c;
--err: #d45454;
--border: rgba(228,231,235,.08);
--border-e: rgba(228,231,235,.14);
--ctrl-bg: #141617;
--ctrl-border: rgba(228,231,235,.10);
--r: 10px;
--r-lg: 14px;
}}
html {{ height: 100%; }} # Simple substitution for initial values
body {{ return (
min-height: 100dvh; template.replace("{default_address}", default_address)
font-family: "Inter", ui-sans-serif, system-ui, sans-serif; .replace("{ble_selected}", " selected" if default_transport == "ble" else "")
font-size: 14px; .replace("{classic_selected}", " selected" if default_transport == "classic" else "")
line-height: 1.5; .replace("{default_channel}", str(_DEFAULT_CHANNEL))
color: var(--ink); )
background-color: var(--s0);
background-image:
radial-gradient(ellipse at 10% 90%, rgba(7,173,223,.07) 0%, transparent 55%),
radial-gradient(ellipse at 90% 5%, rgba(7,173,223,.04) 0%, transparent 55%);
background-attachment: fixed;
}}
/* ── Header ─────────────────────────────────────── */
header {{
position: sticky;
top: 0;
z-index: 100;
padding: 10px 20px;
display: flex;
align-items: center;
gap: 12px;
background: rgba(22,24,25,.82);
backdrop-filter: blur(16px) saturate(1.5);
-webkit-backdrop-filter: blur(16px) saturate(1.5);
border-bottom: 1px solid var(--border);
}}
.brand {{ font-size: 1.05rem; font-weight: 700; letter-spacing: -.025em; }}
.brand em {{ color: var(--brand); font-style: normal; }}
.header-right {{ margin-left: auto; display: flex; align-items: center; gap: 8px; }}
.status-pill {{
display: flex;
align-items: center;
gap: 6px;
padding: 4px 10px;
border-radius: 99px;
font-size: .78rem;
font-weight: 600;
background: var(--s2);
border: 1px solid var(--border-e);
color: var(--muted);
transition: .25s;
cursor: default;
white-space: nowrap;
}}
.status-pill .dot {{
width: 7px; height: 7px;
border-radius: 50%;
background: var(--dim);
transition: .25s;
}}
.status-pill.ok {{ color: var(--ok); border-color: rgba(107,184,138,.3); }}
.status-pill.ok .dot {{ background: var(--ok); box-shadow: 0 0 6px var(--ok); }}
.status-pill.err {{ color: var(--err); border-color: rgba(212,84,84,.3); }}
.status-pill.err .dot {{ background: var(--err); box-shadow: 0 0 6px var(--err); }}
/* ── Layout ─────────────────────────────────────── */
main {{
max-width: 1000px;
margin: 0 auto;
padding: 24px 16px 60px;
display: grid;
gap: 16px;
grid-template-columns: repeat(auto-fit, minmax(290px, 1fr));
}}
main > .full {{ grid-column: 1 / -1; }}
/* ── Cards ──────────────────────────────────────── */
.card {{
padding: 20px;
background: var(--s1);
border: 1px solid var(--border);
border-radius: var(--r-lg);
box-shadow: 0 8px 32px rgba(0,0,0,.3);
}}
.card-title {{
font-size: .7rem;
font-weight: 700;
letter-spacing: .1em;
text-transform: uppercase;
color: var(--dim);
margin-bottom: 14px;
}}
/* ── Form ───────────────────────────────────────── */
label {{
display: block;
font-size: .8rem;
font-weight: 600;
color: var(--muted);
margin: 12px 0 5px;
}}
label:first-child {{ margin-top: 0; }}
input, select, textarea {{
width: 100%;
padding: 8px 11px;
border-radius: var(--r);
border: 1px solid var(--ctrl-border);
background: var(--ctrl-bg);
color: var(--ink);
font: inherit;
transition: border-color .15s, box-shadow .15s;
outline: none;
}}
input:focus, select:focus, textarea:focus {{
border-color: var(--brand);
box-shadow: 0 0 0 3px rgba(7,173,223,.2);
}}
textarea {{ min-height: 90px; resize: vertical; }}
.two-col {{ display: grid; gap: 10px; grid-template-columns: 1fr 1fr; }}
.check-row {{ display: flex; align-items: center; gap: 8px; margin-top: 14px; }}
.check-row input[type=checkbox] {{ width: 15px; height: 15px; accent-color: var(--brand); }}
.check-row label {{ margin: 0; font-size: .85rem; color: var(--ink); }}
/* ── Buttons ────────────────────────────────────── */
.btn {{
display: inline-flex;
align-items: center;
justify-content: center;
gap: 6px;
padding: 8px 16px;
border-radius: var(--r);
border: none;
font: inherit;
font-size: .85rem;
font-weight: 600;
cursor: pointer;
transition: background .15s, box-shadow .15s, opacity .15s;
}}
.btn:disabled {{ opacity: .5; cursor: not-allowed; }}
.btn-primary {{
background: var(--brand);
color: #fff;
}}
.btn-primary:not(:disabled):hover {{
background: var(--brand-h);
box-shadow: 0 0 0 3px var(--brand-glow);
}}
.btn-secondary {{
background: var(--s3);
color: var(--ink);
border: 1px solid var(--border-e);
}}
.btn-secondary:not(:disabled):hover {{
background: var(--s3);
border-color: var(--brand);
}}
.btn-group {{ display: flex; gap: 8px; flex-wrap: wrap; margin-top: 14px; }}
.btn-group .btn {{ flex: 1; min-width: 110px; }}
/* ── Address row ────────────────────────────────── */
.addr-row {{ display: flex; gap: 8px; }}
.addr-row input {{ flex: 1; min-width: 0; }}
.addr-row .btn {{ padding: 8px 12px; white-space: nowrap; flex-shrink: 0; }}
/* ── Output / pre ───────────────────────────────── */
pre {{
background: var(--s0);
border: 1px solid var(--border);
border-radius: var(--r);
color: var(--muted);
font-family: "JetBrains Mono", "Fira Code", ui-monospace, monospace;
font-size: .78rem;
line-height: 1.6;
padding: 14px;
min-height: 130px;
overflow: auto;
white-space: pre-wrap;
word-break: break-all;
}}
pre.ok {{ color: var(--ok); border-color: rgba(107,184,138,.2); }}
pre.err {{ color: var(--err); border-color: rgba(212,84,84,.2); }}
/* ── Spinner ────────────────────────────────────── */
@keyframes spin {{ to {{ transform: rotate(360deg); }} }}
.spinner {{
display: inline-block;
width: 13px; height: 13px;
border: 2px solid rgba(255,255,255,.3);
border-top-color: #fff;
border-radius: 50%;
animation: spin .65s linear infinite;
}}
/* ── Footer ─────────────────────────────────────── */
footer {{
text-align: center;
padding: 16px;
font-size: .72rem;
color: var(--dim);
}}
footer a {{ color: var(--dim); text-decoration: none; }}
footer a:hover {{ color: var(--brand); }}
@media (max-width: 520px) {{
.two-col {{ grid-template-columns: 1fr; }}
.btn-group .btn {{ min-width: 0; }}
}}
</style>
</head>
<body>
<header>
<span class="brand">Fichero<em>Printer</em></span>
<div class="header-right">
<span class="status-pill" id="status-pill">
<span class="dot"></span>
<span id="status-text">Unknown</span>
</span>
<button class="btn btn-secondary" style="padding:6px 10px;font-size:.78rem" onclick="refreshStatus()">↻</button>
</div>
</header>
<main>
<!-- Connection card -->
<div class="card">
<p class="card-title">Connection</p>
<label for="address">Bluetooth address</label>
<div class="addr-row">
<input id="address" value="{default_address}" placeholder="AA:BB:CC:DD:EE:FF">
<button class="btn btn-secondary" id="scan-btn" onclick="scanAddress()">
<span id="scan-icon">⟳</span> Scan
</button>
</div>
<div class="two-col">
<div>
<label for="transport">Transport</label>
<select id="transport">
<option value="ble"{" selected" if default_transport == "ble" else ""}>BLE</option>
<option value="classic"{" selected" if default_transport == "classic" else ""}>Classic BT</option>
</select>
</div>
<div>
<label for="channel">RFCOMM channel</label>
<input id="channel" type="number" min="1" max="30" value="{_DEFAULT_CHANNEL}">
</div>
</div>
<div class="btn-group">
<button class="btn btn-secondary" onclick="runGet('status')">Status</button>
<button class="btn btn-secondary" onclick="runGet('info')">Device info</button>
</div>
</div>
<!-- Output card -->
<div class="card">
<p class="card-title">Response</p>
<pre id="output">Waiting for a command…</pre>
</div>
<!-- Print text card -->
<div class="card">
<p class="card-title">Print text</p>
<label for="text">Text</label>
<textarea id="text" placeholder="Hello from Home Assistant"></textarea>
<div class="two-col">
<div>
<label for="text_density">Density</label>
<select id="text_density">
<option value="0">0 Light</option>
<option value="1">1 Medium</option>
<option value="2" selected>2 Dark</option>
</select>
</div>
<div>
<label for="text_copies">Copies</label>
<input id="text_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="two-col">
<div>
<label for="text_font_size">Font size (pt)</label>
<input id="text_font_size" type="number" min="6" max="200" value="30">
</div>
<div>
<label for="text_label_length">Label length (mm)</label>
<input id="text_label_length" type="number" min="5" max="500" value="30">
</div>
</div>
<label for="text_paper">Paper type</label>
<select id="text_paper">
<option value="gap" selected>Gap / label</option>
<option value="black">Black mark</option>
<option value="continuous">Continuous</option>
</select>
<div class="btn-group">
<button class="btn btn-primary" onclick="printText()">🖨 Print text</button>
</div>
</div>
<!-- Print image card -->
<div class="card">
<p class="card-title">Print image</p>
<label for="image_file">Image file (PNG / JPEG / BMP / WEBP)</label>
<input id="image_file" type="file" accept="image/*">
<div class="two-col">
<div>
<label for="image_density">Density</label>
<select id="image_density">
<option value="0">0 Light</option>
<option value="1">1 Medium</option>
<option value="2" selected>2 Dark</option>
</select>
</div>
<div>
<label for="image_copies">Copies</label>
<input id="image_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="two-col">
<div>
<label for="image_label_length">Label length (mm)</label>
<input id="image_label_length" type="number" min="5" max="500" value="30">
</div>
<div>
<label for="image_paper">Paper type</label>
<select id="image_paper">
<option value="gap" selected>Gap / label</option>
<option value="black">Black mark</option>
<option value="continuous">Continuous</option>
</select>
</div>
</div>
<div class="check-row">
<input id="image_dither" type="checkbox" checked>
<label for="image_dither">Floyd-Steinberg dithering</label>
</div>
<div class="btn-group">
<button class="btn btn-primary" onclick="printImage()">🖨 Print image</button>
</div>
</div>
</main>
<footer>
Fichero Printer REST API &nbsp;·&nbsp;
<a href="docs">Swagger UI</a>
</footer>
<script>
// ── Helpers ──────────────────────────────────────────
function commonParams() {{
const address = document.getElementById("address").value.trim();
const classic = document.getElementById("transport").value === "classic";
const channel = document.getElementById("channel").value;
const p = new URLSearchParams();
if (address) p.set("address", address);
p.set("classic", String(classic));
p.set("channel", channel);
return p;
}}
function setOutput(obj, isOk) {{
const pre = document.getElementById("output");
pre.textContent = JSON.stringify(obj, null, 2);
pre.className = isOk ? "ok" : "err";
}}
async function showResponse(response) {{
let data;
try {{ data = await response.json(); }} catch {{ data = {{ detail: await response.text() }}; }}
setOutput({{ status: response.status, ok: response.ok, data }}, response.ok);
}}
// ── Status pill ──────────────────────────────────────
async function refreshStatus() {{
try {{
const r = await fetch("status?" + commonParams().toString());
const d = await r.json();
const pill = document.getElementById("status-pill");
const txt = document.getElementById("status-text");
if (r.ok && d.ok) {{
pill.className = "status-pill ok";
txt.textContent = d.no_paper ? "No paper" : d.charging ? "Charging" : "Ready";
}} else {{
pill.className = "status-pill err";
txt.textContent = r.ok ? "Not ready" : "Not found";
}}
}} catch {{ /* ignore */ }}
}}
// auto-refresh status on load
window.addEventListener("DOMContentLoaded", refreshStatus);
// ── Scan ─────────────────────────────────────────────
async function scanAddress() {{
const btn = document.getElementById("scan-btn");
const icon = document.getElementById("scan-icon");
btn.disabled = true;
icon.outerHTML = '<span id="scan-icon" class="spinner"></span>';
setOutput({{ message: "Scanning for printer (up to 8 s)…" }}, true);
try {{
const r = await fetch("scan");
const d = await r.json();
if (r.ok && d.address) {{
document.getElementById("address").value = d.address;
}}
setOutput({{ status: r.status, ok: r.ok, data: d }}, r.ok);
}} catch (e) {{
setOutput({{ error: String(e) }}, false);
}} finally {{
btn.disabled = false;
document.getElementById("scan-icon").outerHTML = '<span id="scan-icon">⟳</span>';
}}
}}
async function runGet(path) {{
setOutput({{ message: "Loading…" }}, true);
const r = await fetch(path + "?" + commonParams().toString());
await showResponse(r);
}}
// ── Print text ───────────────────────────────────────
async function printText() {{
const form = new FormData();
form.set("text", document.getElementById("text").value);
form.set("density", document.getElementById("text_density").value);
form.set("copies", document.getElementById("text_copies").value);
form.set("font_size", document.getElementById("text_font_size").value);
form.set("label_length", document.getElementById("text_label_length").value);
form.set("paper", document.getElementById("text_paper").value);
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
setOutput({{ message: "Sending to printer…" }}, true);
const r = await fetch("print/text", {{ method: "POST", body: form }});
await showResponse(r);
if (r.ok) refreshStatus();
}}
// ── Print image ──────────────────────────────────────
async function printImage() {{
const fi = document.getElementById("image_file");
if (!fi.files.length) {{ setOutput({{ error: "Select an image file first." }}, false); return; }}
const form = new FormData();
form.set("file", fi.files[0]);
form.set("density", document.getElementById("image_density").value);
form.set("copies", document.getElementById("image_copies").value);
form.set("label_length", document.getElementById("image_label_length").value);
form.set("paper", document.getElementById("image_paper").value);
form.set("dither", String(document.getElementById("image_dither").checked));
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
setOutput({{ message: "Sending to printer…" }}, true);
const r = await fetch("print/image", {{ method: "POST", body: form }});
await showResponse(r);
if (r.ok) refreshStatus();
}}
</script>
</body>
</html>"""
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -611,22 +132,6 @@ async def docs():
) )
@app.get(
"/scan",
summary="Scan for printer",
response_description="BLE address of the discovered printer",
)
async def scan_printer():
"""Scan for a Fichero/D11s printer via BLE and return its address."""
try:
address = await find_printer()
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return {"address": address}
@app.get( @app.get(
"/status", "/status",
summary="Get printer status", summary="Get printer status",
@@ -685,6 +190,90 @@ async def get_info(
return info return info
@app.post(
"/pair",
summary="Pair and trust a Bluetooth device",
status_code=200,
)
async def pair_device(
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
):
"""
Attempt to pair and trust the device using `bluetoothctl`.
This is intended for setting up Classic Bluetooth connections.
"""
addr = _address(address)
if not addr:
raise HTTPException(status_code=422, detail="Address is required to pair.")
# Basic validation for MAC address format to mitigate injection risk.
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
cmd = f'echo -e "pair {addr}\\ntrust {addr}\\nquit" | bluetoothctl'
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
except FileNotFoundError:
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
output = stdout.decode(errors="ignore")
error = stderr.decode(errors="ignore")
if "Failed to pair" in output or "not available" in output.lower():
raise HTTPException(status_code=502, detail=f"Pairing failed. Output: {output}. Error: {error}")
return {"ok": True, "message": "Pair/trust command sent. Check output for details.", "output": output, "error": error}
@app.post(
"/unpair",
summary="Unpair a Bluetooth device",
status_code=200,
)
async def unpair_device(
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
):
"""
Attempt to unpair the device using `bluetoothctl`.
"""
addr = _address(address)
if not addr:
raise HTTPException(status_code=422, detail="Address is required to unpair.")
# Basic validation for MAC address format to mitigate injection risk.
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
cmd = f'echo -e "remove {addr}\\nquit" | bluetoothctl'
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
except FileNotFoundError:
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
output = stdout.decode(errors="ignore")
error = stderr.decode(errors="ignore")
if "Failed to remove" in output or "not available" in output.lower():
raise HTTPException(status_code=502, detail=f"Unpairing failed. Output: {output}. Error: {error}")
return {"ok": True, "message": "Unpair command sent. Check output for details.", "output": output, "error": error}
@app.post( @app.post(
"/print/text", "/print/text",
summary="Print a text label", summary="Print a text label",

View File

@@ -8,6 +8,7 @@ Device class: AiYinNormalDevice (LuckPrinter SDK)
import asyncio import asyncio
import sys import sys
import errno
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
@@ -427,14 +428,31 @@ async def connect(
yield pc yield pc
return return
except (PrinterError, PrinterTimeout) as exc: except (PrinterError, PrinterTimeout) as exc:
# On Linux, a stale BlueZ device state can cause RFCOMM connect()
# to fail with [Errno 12] Out of memory. This is a known quirk.
# We treat this specific error as a signal to fall back to BLE.
if isinstance(exc.__cause__, OSError) and exc.__cause__.errno == errno.ENOMEM:
print(
"Classic Bluetooth connection failed with [Errno 12] Out of memory. "
"Falling back to BLE connection."
)
classic = False # Modify flag to trigger BLE path below
last_exc = exc
break
last_exc = exc last_exc = exc
if last_exc is not None:
raise PrinterError( # If the 'classic' flag is still true, it means the loop completed without
f"Classic Bluetooth connection failed for '{address}'. " # hitting the ENOMEM fallback case, so all classic attempts failed.
f"Tried channels: {channels}. Last error: {last_exc}" if classic:
) from last_exc if last_exc is not None:
raise PrinterError(f"Classic Bluetooth connection failed for '{address}'.") raise PrinterError(
else: f"Classic Bluetooth connection failed for '{address}'. "
f"Tried channels: {channels}. Last error: {last_exc}"
) from last_exc
raise PrinterError(f"Classic Bluetooth connection failed for '{address}'.")
# If classic=False initially, or if it was set to False for the ENOMEM fallback:
if not classic:
target = await resolve_ble_target(address) target = await resolve_ble_target(address)
def _is_retryable_ble_error(exc: Exception) -> bool: def _is_retryable_ble_error(exc: Exception) -> bool:
msg = str(exc).lower() msg = str(exc).lower()

View File

@@ -0,0 +1,13 @@
configuration:
port:
name: "API-Port"
description: "Port des REST-API-Servers. Den obigen Port-Mapping-Eintrag entsprechend anpassen."
ble_address:
name: "Bluetooth-Adresse"
description: "Feste BLE-Adresse des Druckers (z.B. AA:BB:CC:DD:EE:FF). Leer lassen für automatischen Scan."
transport:
name: "Transport"
description: "Verbindungsart: 'ble' für Bluetooth Low Energy (Standard) oder 'classic' für RFCOMM."
channel:
name: "RFCOMM-Kanal"
description: "Classic-Bluetooth-RFCOMM-Kanal. Nur relevant wenn Transport auf 'classic' gesetzt ist."

View File

@@ -1,13 +1,13 @@
configuration: configuration:
port: port:
name: "API-Port" name: "API Port"
description: "Port des REST-API-Servers. Den obigen Port-Mapping-Eintrag entsprechend anpassen." description: "Port for the REST API server. Adjust the port mapping entry above accordingly."
ble_address: ble_address:
name: "Bluetooth-Adresse" name: "Bluetooth Address"
description: "Feste BLE-Adresse des Druckers (z.B. AA:BB:CC:DD:EE:FF). Leer lassen für automatischen Scan." description: "Fixed BLE address of the printer (e.g., AA:BB:CC:DD:EE:FF). Leave empty for automatic scan."
transport: transport:
name: "Transport" name: "Transport"
description: "Verbindungsart: 'ble' für Bluetooth Low Energy (Standard) oder 'classic' für RFCOMM." description: "Connection type: 'ble' for Bluetooth Low Energy (default) or 'classic' for RFCOMM."
channel: channel:
name: "RFCOMM-Kanal" name: "RFCOMM Channel"
description: "Classic-Bluetooth-RFCOMM-Kanal. Nur relevant wenn Transport auf 'classic' gesetzt ist." description: "Classic Bluetooth RFCOMM channel. Only relevant if transport is set to 'classic'."

View File

@@ -1,28 +1,31 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project] [project]
name = "fichero-printer" name = "fichero-printer"
version = "0.1.15" version = "0.1.27"
description = "Fichero D11s thermal label printer - BLE CLI tool" description = "Web GUI, Python CLI, and protocol documentation for the Fichero D11s thermal label printer."
readme = "README.md"
requires-python = ">=3.10" requires-python = ">=3.10"
license = {text = "MIT"}
authors = [
{name = "0xMH"},
{name = "Paul Kozber"},
]
dependencies = [ dependencies = [
"bleak", "bleak",
"numpy", "numpy",
"pillow", "Pillow",
] "fastapi",
"uvicorn[standard]",
[project.optional-dependencies]
api = [
"fastapi>=0.111",
"uvicorn[standard]>=0.29",
"python-multipart>=0.0.9", "python-multipart>=0.0.9",
] ]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["fichero"]
[project.scripts] [project.scripts]
fichero = "fichero.cli:main" fichero = "fichero.cli:main"
fichero-server = "fichero.api:main" fichero-server = "fichero.api:main"
[tool.setuptools.packages.find]
where = ["fichero_printer"]
include = ["fichero*"]

0
sync_addon.sh Normal file
View File