feat: Add Fichero D11s thermal label printer support with REST API and CLI
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled

- Implemented a new module for the Fichero D11s thermal label printer, including BLE and Classic Bluetooth interfaces.
- Created a REST API using FastAPI to manage printer status, info, and printing tasks (text and images).
- Developed a CLI for direct printer interaction, allowing users to print text and images, check status, and modify settings.
- Added image processing capabilities for converting text and images to the required format for printing.
- Introduced error handling for printer operations and connection management.
- Included a shell script for running the API server with configurable parameters.
- Added English translations for configuration options.
- Created a repository metadata file for project management.
This commit is contained in:
2026-03-07 11:52:11 +01:00
parent 43495714e6
commit 14be205eb1
13 changed files with 1318 additions and 1 deletions

144
fichero_printer/DOCS.md Normal file
View File

@@ -0,0 +1,144 @@
# Fichero Printer Home Assistant Add-on
Ein HTTP-REST-API-Server für den **Fichero D11s** (auch bekannt als AiYin D11s)
Thermodrucker. Das Add-on ermöglicht das Drucken von Textetiketten und Bildern
direkt aus Home Assistant-Automationen, Skripten oder externen Anwendungen.
## Voraussetzungen
- Fichero D11s / AiYin D11s Drucker
- Ein Bluetooth-Adapter, der vom Home Assistant OS erkannt wird
- Der Drucker muss eingeschaltet und in Reichweite sein
## Konfiguration
| Option | Standard | Beschreibung |
|---|---|---|
| `port` | `8765` | Port des REST-API-Servers (auch im „Port-Mapping" oben anpassen) |
| `ble_address` | _(leer)_ | Feste BLE-Adresse des Druckers (z.B. `AA:BB:CC:DD:EE:FF`). Leer lassen für automatischen Scan. |
| `transport` | `ble` | Verbindungsart: `ble` (Bluetooth Low Energy) oder `classic` (RFCOMM) |
| `channel` | `1` | RFCOMM-Kanal nur relevant bei `transport: classic` |
## Verwendung
Nach dem Start ist die API unter `http://<HA-IP>:<port>` erreichbar.
Die interaktive Swagger-Dokumentation ist unter `http://<HA-IP>:<port>/docs` verfügbar.
### Endpunkte
#### `GET /status`
Gibt den aktuellen Druckerstatus zurück.
```bash
curl http://homeassistant.local:8765/status
```
Antwort:
```json
{
"ok": true,
"printing": false,
"cover_open": false,
"no_paper": false,
"low_battery": false,
"overheated": false,
"charging": false,
"raw": 0
}
```
#### `GET /info`
Gibt Geräteinformationen zurück (Modell, Firmware, Seriennummer, Akkustand).
```bash
curl http://homeassistant.local:8765/info
```
#### `POST /print/text`
Druckt ein Textetikett.
```bash
curl -X POST http://homeassistant.local:8765/print/text \
-F text="Hallo Welt" \
-F density=2 \
-F paper=gap \
-F label_length=30
```
| Feld | Standard | Beschreibung |
|---|---|---|
| `text` | | **Pflichtfeld.** Zu druckender Text |
| `density` | `2` | Druckdichte: `0`=hell, `1`=mittel, `2`=dunkel |
| `paper` | `gap` | Papierart: `gap`, `black`, `continuous` |
| `copies` | `1` | Anzahl der Kopien (199) |
| `font_size` | `30` | Schriftgröße in Punkt |
| `label_length` | | Etikettenlänge in mm (überschreibt `label_height`) |
| `label_height` | `240` | Etikettenhöhe in Pixel |
#### `POST /print/image`
Druckt eine Bilddatei (PNG, JPEG, BMP, GIF, TIFF, WEBP).
```bash
curl -X POST http://homeassistant.local:8765/print/image \
-F file=@etikett.png \
-F density=2 \
-F dither=true \
-F label_length=40
```
| Feld | Standard | Beschreibung |
|---|---|---|
| `file` | | **Pflichtfeld.** Bilddatei |
| `density` | `2` | Druckdichte: `0`=hell, `1`=mittel, `2`=dunkel |
| `paper` | `gap` | Papierart: `gap`, `black`, `continuous` |
| `copies` | `1` | Anzahl der Kopien (199) |
| `dither` | `true` | Floyd-Steinberg-Dithering aktivieren |
| `label_length` | | Max. Etikettenlänge in mm |
| `label_height` | `240` | Max. Etikettenhöhe in Pixel |
### Fehlercodes
| Status | Bedeutung |
|---|---|
| `404` | Drucker nicht gefunden (BLE-Scan fehlgeschlagen oder Adresse ungültig) |
| `422` | Ungültige Parameter oder leere Datei |
| `502` | Kommunikationsfehler mit dem Drucker |
| `504` | Drucker hat nicht rechtzeitig geantwortet |
## Home Assistant Automation Beispiel
```yaml
alias: Etikett drucken
trigger:
- platform: state
entity_id: input_text.etikett_text
action:
- service: rest_command.fichero_print_text
data:
text: "{{ states('input_text.etikett_text') }}"
```
In `configuration.yaml`:
```yaml
rest_command:
fichero_print_text:
url: "http://localhost:8765/print/text"
method: POST
content_type: "application/x-www-form-urlencoded"
payload: "text={{ text }}&density=2&label_length=30"
```
## Hinweise zur Bluetooth-Verbindung
- **BLE (Standard):** Das Add-on benötigt Zugriff auf BlueZ über D-Bus
(`host_dbus: true`). Home Assistant OS stellt BlueZ automatisch bereit.
- **Classic Bluetooth (RFCOMM):** Nur unter Linux verfügbar. Erfordert die
direkte Bluetooth-Adresse (kein automatischer Scan möglich).
- Wenn die BLE-Adresse bekannt ist, diese in der Konfiguration eintragen
das beschleunigt den Verbindungsaufbau erheblich (kein Scan nötig).
- Der Drucker muss eingeschaltet sein, bevor eine Anfrage gestellt wird.
Es gibt keine persistente Verbindung jede Anfrage verbindet sich neu.

View File

@@ -0,0 +1,30 @@
ARG BUILD_FROM
FROM $BUILD_FROM
# Install Bluetooth system libraries (BlueZ for BLE/RFCOMM)
RUN apk add --no-cache \
bluez \
bluez-deprecated \
dbus
# Install Python runtime dependencies
RUN pip3 install --no-cache-dir --break-system-packages \
"bleak>=0.21" \
"numpy" \
"pillow" \
"fastapi>=0.111" \
"uvicorn[standard]>=0.29" \
"python-multipart>=0.0.9"
# Copy the fichero Python package into the container
WORKDIR /app
COPY fichero/ /app/fichero/
# Make the package importable without installation
ENV PYTHONPATH=/app
# Copy and register the startup script
COPY run.sh /usr/bin/run.sh
RUN chmod +x /usr/bin/run.sh
CMD ["/usr/bin/run.sh"]

View File

@@ -0,0 +1,6 @@
build_from:
aarch64: "ghcr.io/home-assistant/aarch64-base-python:3.13"
amd64: "ghcr.io/home-assistant/amd64-base-python:3.13"
armhf: "ghcr.io/home-assistant/armhf-base-python:3.13"
armv7: "ghcr.io/home-assistant/armv7-base-python:3.13"
i386: "ghcr.io/home-assistant/i386-base-python:3.13"

View File

@@ -0,0 +1,42 @@
name: "Fichero Printer"
version: "0.1.0"
slug: "fichero_printer"
description: "REST API for the Fichero D11s (AiYin) thermal label printer over Bluetooth"
url: "https://git.leuschner.dev/Tobias/Fichero"
arch:
- aarch64
- amd64
- armhf
- armv7
- i386
init: false
startup: application
boot: auto
host_network: true
host_dbus: true
# NET_ADMIN is required for Classic Bluetooth (RFCOMM) raw socket access.
# BLE uses D-Bus (host_dbus) and does not need this.
privileged:
- NET_ADMIN
options:
port: 8765
ble_address: ""
transport: "ble"
channel: 1
schema:
port: int
ble_address: str?
transport: list(ble|classic)
channel: int
ports:
8765/tcp: 8765
ports_description:
8765/tcp: "Fichero Printer REST API"

View File

@@ -0,0 +1,25 @@
"""Fichero D11s thermal label printer - BLE + Classic Bluetooth interface."""
from fichero.printer import (
RFCOMM_CHANNEL,
PrinterClient,
PrinterError,
PrinterNotFound,
PrinterNotReady,
PrinterStatus,
PrinterTimeout,
RFCOMMClient,
connect,
)
__all__ = [
"RFCOMM_CHANNEL",
"PrinterClient",
"PrinterError",
"PrinterNotFound",
"PrinterNotReady",
"PrinterStatus",
"PrinterTimeout",
"RFCOMMClient",
"connect",
]

View File

@@ -0,0 +1,293 @@
"""HTTP REST API for the Fichero D11s thermal label printer.
Start with:
fichero-server [--host HOST] [--port PORT]
or:
python -m fichero.api
Endpoints:
GET /status Printer status
GET /info Printer info (model, firmware, battery, …)
POST /print/text Print a text label
POST /print/image Print an uploaded image file
"""
from __future__ import annotations
import argparse
import asyncio
import io
import os
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from PIL import Image
from fichero.cli import DOTS_PER_MM, do_print
from fichero.imaging import text_to_image
from fichero.printer import (
PAPER_GAP,
PrinterError,
PrinterNotFound,
PrinterTimeout,
connect,
)
# ---------------------------------------------------------------------------
# Global connection settings (env vars or CLI flags at startup)
# ---------------------------------------------------------------------------
_DEFAULT_ADDRESS: str | None = os.environ.get("FICHERO_ADDR")
_DEFAULT_CLASSIC: bool = os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic"
_DEFAULT_CHANNEL: int = int(os.environ.get("FICHERO_CHANNEL", "1"))
_PAPER_MAP = {"gap": 0, "black": 1, "continuous": 2}
def _parse_paper(value: str) -> int:
if value in _PAPER_MAP:
return _PAPER_MAP[value]
try:
val = int(value)
if 0 <= val <= 2:
return val
except ValueError:
pass
raise HTTPException(status_code=422, detail=f"Invalid paper type '{value}'. Use gap, black, continuous or 0-2.")
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI): # noqa: ARG001
yield
app = FastAPI(
title="Fichero Printer API",
description="REST API for the Fichero D11s (AiYin) thermal label printer.",
version="0.1.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def _address(address: str | None) -> str | None:
"""Return the effective BLE address (request value overrides env default)."""
return address or _DEFAULT_ADDRESS
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get(
"/status",
summary="Get printer status",
response_description="Current printer status flags",
)
async def get_status(
address: str | None = None,
classic: bool = _DEFAULT_CLASSIC,
channel: int = _DEFAULT_CHANNEL,
):
"""Return the real-time status of the printer (paper, battery, heat, …)."""
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
status = await pc.get_status()
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return {
"ok": status.ok,
"printing": status.printing,
"cover_open": status.cover_open,
"no_paper": status.no_paper,
"low_battery": status.low_battery,
"overheated": status.overheated,
"charging": status.charging,
"raw": status.raw,
}
@app.get(
"/info",
summary="Get printer info",
response_description="Model, firmware, serial number and battery level",
)
async def get_info(
address: str | None = None,
classic: bool = _DEFAULT_CLASSIC,
channel: int = _DEFAULT_CHANNEL,
):
"""Return static and dynamic printer information."""
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
info = await pc.get_info()
info.update(await pc.get_all_info())
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return info
@app.post(
"/print/text",
summary="Print a text label",
status_code=200,
)
async def print_text(
text: Annotated[str, Form(description="Text to print on the label")],
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
font_size: Annotated[int, Form(description="Font size in points", ge=6, le=200)] = 30,
label_length: Annotated[int | None, Form(description="Label length in mm (overrides label_height)", ge=5, le=500)] = None,
label_height: Annotated[int, Form(description="Label height in pixels", ge=40, le=4000)] = 240,
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
):
"""Print a plain-text label.
The text is rendered as a 96 px wide, 1-bit image and sent to the printer.
"""
paper_val = _parse_paper(paper)
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
img = text_to_image(text, font_size=font_size, label_height=max_rows)
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
dither=False, max_rows=max_rows)
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
if not ok:
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
return {"ok": True, "copies": copies, "text": text}
@app.post(
"/print/image",
summary="Print an image",
status_code=200,
)
async def print_image(
file: Annotated[UploadFile, File(description="Image file to print (PNG, JPEG, BMP, …)")],
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
dither: Annotated[bool, Form(description="Apply Floyd-Steinberg dithering")] = True,
label_length: Annotated[int | None, Form(description="Max label length in mm (overrides label_height)", ge=5, le=500)] = None,
label_height: Annotated[int, Form(description="Max label height in pixels", ge=40, le=4000)] = 240,
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
):
"""Print an image file.
The image is resized to 96 px wide, optionally dithered to 1-bit, and sent to the printer.
Supported formats: PNG, JPEG, BMP, GIF, TIFF, WEBP.
"""
# Validate content type loosely — Pillow will raise on unsupported data
data = await file.read()
if not data:
raise HTTPException(status_code=422, detail="Uploaded file is empty.")
try:
img = Image.open(io.BytesIO(data))
img.load() # ensure the image is fully decoded
except Exception as exc:
raise HTTPException(status_code=422, detail=f"Cannot decode image: {exc}") from exc
paper_val = _parse_paper(paper)
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
dither=dither, max_rows=max_rows)
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
if not ok:
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
return {"ok": True, "copies": copies, "filename": file.filename}
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
"""Start the Fichero HTTP API server."""
try:
import uvicorn # noqa: PLC0415
except ImportError:
print("ERROR: uvicorn is required to run the API server.")
print("Install it with: pip install 'fichero-printer[api]'")
raise SystemExit(1) from None
parser = argparse.ArgumentParser(description="Fichero Printer API Server")
parser.add_argument("--host", default="127.0.0.1", help="Bind host (default: 127.0.0.1)")
parser.add_argument("--port", type=int, default=8765, help="Bind port (default: 8765)")
parser.add_argument("--address", default=_DEFAULT_ADDRESS, metavar="BLE_ADDR",
help="Default BLE address (or set FICHERO_ADDR env var)")
parser.add_argument("--classic", action="store_true", default=_DEFAULT_CLASSIC,
help="Default to Classic Bluetooth RFCOMM")
parser.add_argument("--channel", type=int, default=_DEFAULT_CHANNEL,
help="Default RFCOMM channel (default: 1)")
parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)")
args = parser.parse_args()
# Push CLI overrides into module-level defaults so all handlers pick them up
global _DEFAULT_ADDRESS, _DEFAULT_CLASSIC, _DEFAULT_CHANNEL
_DEFAULT_ADDRESS = args.address
_DEFAULT_CLASSIC = args.classic
_DEFAULT_CHANNEL = args.channel
# Pass the app object directly when not reloading so that the module-level
# globals (_DEFAULT_ADDRESS etc.) set above are visible to the handlers.
# The string form "fichero.api:app" is required for --reload only, because
# uvicorn's reloader needs to re-import the module in a worker process.
uvicorn.run(
"fichero.api:app" if args.reload else app,
host=args.host,
port=args.port,
reload=args.reload,
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,251 @@
"""CLI for Fichero D11s thermal label printer."""
import argparse
import asyncio
import os
import sys
from PIL import Image
from fichero.imaging import image_to_raster, prepare_image, text_to_image
from fichero.printer import (
BYTES_PER_ROW,
DELAY_AFTER_DENSITY,
DELAY_AFTER_FEED,
DELAY_COMMAND_GAP,
DELAY_RASTER_SETTLE,
PAPER_GAP,
PrinterClient,
PrinterError,
PrinterNotReady,
connect,
)
DOTS_PER_MM = 8 # 203 DPI
def _resolve_label_height(args: argparse.Namespace) -> int:
"""Return label height in pixels from --label-length (mm) or --label-height (px)."""
if args.label_length is not None:
return args.label_length * DOTS_PER_MM
return args.label_height
async def do_print(
pc: PrinterClient,
img: Image.Image,
density: int = 1,
paper: int = PAPER_GAP,
copies: int = 1,
dither: bool = True,
max_rows: int = 240,
) -> bool:
img = prepare_image(img, max_rows=max_rows, dither=dither)
rows = img.height
raster = image_to_raster(img)
print(f" Image: {img.width}x{rows}, {len(raster)} bytes, {copies} copies")
await pc.set_density(density)
await asyncio.sleep(DELAY_AFTER_DENSITY)
for copy_num in range(copies):
if copies > 1:
print(f" Copy {copy_num + 1}/{copies}...")
# Check status before each copy (matches decompiled app behaviour)
status = await pc.get_status()
if not status.ok:
raise PrinterNotReady(f"Printer not ready: {status}")
# AiYin print sequence (from decompiled APK)
await pc.set_paper_type(paper)
await asyncio.sleep(DELAY_COMMAND_GAP)
await pc.wakeup()
await asyncio.sleep(DELAY_COMMAND_GAP)
await pc.enable()
await asyncio.sleep(DELAY_COMMAND_GAP)
# Raster image: GS v 0 m xL xH yL yH <data>
yl = rows & 0xFF
yh = (rows >> 8) & 0xFF
header = bytes([0x1D, 0x76, 0x30, 0x00, BYTES_PER_ROW, 0x00, yl, yh])
await pc.send_chunked(header + raster)
await asyncio.sleep(DELAY_RASTER_SETTLE)
await pc.form_feed()
await asyncio.sleep(DELAY_AFTER_FEED)
ok = await pc.stop_print()
if not ok:
print(" WARNING: no OK/0xAA from stop command")
return True
async def cmd_info(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
info = await pc.get_info()
for k, v in info.items():
print(f" {k}: {v}")
print()
all_info = await pc.get_all_info()
for k, v in all_info.items():
print(f" {k}: {v}")
async def cmd_status(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
status = await pc.get_status()
print(f" Status: {status}")
print(f" Raw: 0x{status.raw:02X} ({status.raw:08b})")
print(f" printing={status.printing} cover_open={status.cover_open} "
f"no_paper={status.no_paper} low_battery={status.low_battery} "
f"overheated={status.overheated} charging={status.charging}")
async def cmd_text(args: argparse.Namespace) -> None:
text = " ".join(args.text)
label_h = _resolve_label_height(args)
img = text_to_image(text, font_size=args.font_size, label_height=label_h)
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
print(f'Printing "{text}"...')
ok = await do_print(pc, img, args.density, paper=args.paper,
copies=args.copies, dither=False, max_rows=label_h)
print("Done." if ok else "FAILED.")
async def cmd_image(args: argparse.Namespace) -> None:
img = Image.open(args.path)
label_h = _resolve_label_height(args)
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
print(f"Printing {args.path}...")
ok = await do_print(pc, img, args.density, paper=args.paper,
copies=args.copies, dither=not args.no_dither,
max_rows=label_h)
print("Done." if ok else "FAILED.")
async def cmd_set(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
if args.setting == "density":
val = int(args.value)
if not 0 <= val <= 2:
print(" ERROR: density must be 0, 1, or 2")
return
ok = await pc.set_density(val)
print(f" Set density={args.value}: {'OK' if ok else 'FAILED'}")
elif args.setting == "shutdown":
val = int(args.value)
if not 1 <= val <= 480:
print(" ERROR: shutdown must be 1-480 minutes")
return
ok = await pc.set_shutdown_time(val)
print(f" Set shutdown={args.value}min: {'OK' if ok else 'FAILED'}")
elif args.setting == "paper":
types = {"gap": 0, "black": 1, "continuous": 2}
if args.value in types:
val = types[args.value]
else:
try:
val = int(args.value)
except ValueError:
print(" ERROR: paper must be gap, black, continuous, or 0-2")
return
if not 0 <= val <= 2:
print(" ERROR: paper must be gap, black, continuous, or 0-2")
return
ok = await pc.set_paper_type(val)
print(f" Set paper={args.value}: {'OK' if ok else 'FAILED'}")
def _add_paper_arg(parser: argparse.ArgumentParser) -> None:
"""Add --paper argument to a subparser."""
parser.add_argument(
"--paper", type=str, default="gap",
help="Paper type: gap (default), black, continuous",
)
def _parse_paper(value: str) -> int:
"""Convert paper string/int to protocol value."""
types = {"gap": 0, "black": 1, "continuous": 2}
if value in types:
return types[value]
try:
val = int(value)
if 0 <= val <= 2:
return val
except ValueError:
pass
print(f" WARNING: unknown paper type '{value}', using gap")
return 0
def main() -> None:
parser = argparse.ArgumentParser(description="Fichero D11s Label Printer")
parser.add_argument("--address", default=os.environ.get("FICHERO_ADDR"),
help="BLE address (skip scanning, or set FICHERO_ADDR)")
parser.add_argument("--classic", action="store_true",
default=os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic",
help="Use Classic Bluetooth (RFCOMM) instead of BLE (Linux only, "
"or set FICHERO_TRANSPORT=classic)")
parser.add_argument("--channel", type=int, default=1,
help="RFCOMM channel (default: 1, only used with --classic)")
sub = parser.add_subparsers(dest="command", required=True)
p_info = sub.add_parser("info", help="Show device info")
p_info.set_defaults(func=cmd_info)
p_status = sub.add_parser("status", help="Show detailed status")
p_status.set_defaults(func=cmd_status)
p_text = sub.add_parser("text", help="Print text label")
p_text.add_argument("text", nargs="+", help="Text to print")
p_text.add_argument("--density", type=int, default=2, choices=[0, 1, 2],
help="Print density: 0=light, 1=medium, 2=thick")
p_text.add_argument("--copies", type=int, default=1, help="Number of copies")
p_text.add_argument("--font-size", type=int, default=30, help="Font size in points")
p_text.add_argument("--label-length", type=int, default=None,
help="Label length in mm (default: 30mm)")
p_text.add_argument("--label-height", type=int, default=240,
help="Label height in pixels (default: 240, prefer --label-length)")
_add_paper_arg(p_text)
p_text.set_defaults(func=cmd_text)
p_image = sub.add_parser("image", help="Print image file")
p_image.add_argument("path", help="Path to image file")
p_image.add_argument("--density", type=int, default=2, choices=[0, 1, 2],
help="Print density: 0=light, 1=medium, 2=thick")
p_image.add_argument("--copies", type=int, default=1, help="Number of copies")
p_image.add_argument("--no-dither", action="store_true",
help="Disable Floyd-Steinberg dithering (use simple threshold)")
p_image.add_argument("--label-length", type=int, default=None,
help="Label length in mm (default: 30mm)")
p_image.add_argument("--label-height", type=int, default=240,
help="Max image height in pixels (default: 240, prefer --label-length)")
_add_paper_arg(p_image)
p_image.set_defaults(func=cmd_image)
p_set = sub.add_parser("set", help="Change printer settings")
p_set.add_argument("setting", choices=["density", "shutdown", "paper"],
help="Setting to change")
p_set.add_argument("value", help="New value")
p_set.set_defaults(func=cmd_set)
args = parser.parse_args()
# Resolve --paper string to int for print commands
if hasattr(args, "paper") and isinstance(args.paper, str):
args.paper = _parse_paper(args.paper)
try:
asyncio.run(args.func(args))
except PrinterError as e:
print(f" ERROR: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,97 @@
"""Image processing for Fichero D11s thermal label printer."""
import logging
import numpy as np
from PIL import Image, ImageDraw, ImageFont, ImageOps
from fichero.printer import PRINTHEAD_PX
log = logging.getLogger(__name__)
def floyd_steinberg_dither(img: Image.Image) -> Image.Image:
"""Floyd-Steinberg error-diffusion dithering to 1-bit.
Same algorithm as PrinterImageProcessor.ditherFloydSteinberg() in the
decompiled Fichero APK: distributes quantisation error to neighbouring
pixels with weights 7/16, 3/16, 5/16, 1/16.
"""
arr = np.array(img, dtype=np.float32)
h, w = arr.shape
for y in range(h):
for x in range(w):
old = arr[y, x]
new = 0.0 if old < 128 else 255.0
arr[y, x] = new
err = old - new
if x + 1 < w:
arr[y, x + 1] += err * 7 / 16
if y + 1 < h:
if x - 1 >= 0:
arr[y + 1, x - 1] += err * 3 / 16
arr[y + 1, x] += err * 5 / 16
if x + 1 < w:
arr[y + 1, x + 1] += err * 1 / 16
arr = np.clip(arr, 0, 255).astype(np.uint8)
return Image.fromarray(arr, mode="L")
def prepare_image(
img: Image.Image, max_rows: int = 240, dither: bool = True
) -> Image.Image:
"""Convert any image to 96px wide, 1-bit, black on white.
When *dither* is True (default), uses Floyd-Steinberg error diffusion
for better quality on photos and gradients. Set False for crisp text.
"""
img = img.convert("L")
w, h = img.size
new_h = int(h * (PRINTHEAD_PX / w))
img = img.resize((PRINTHEAD_PX, new_h), Image.LANCZOS)
if new_h > max_rows:
log.warning("Image height %dpx exceeds max %dpx, cropping bottom", new_h, max_rows)
img = img.crop((0, 0, PRINTHEAD_PX, max_rows))
img = ImageOps.autocontrast(img, cutoff=1)
if dither:
img = floyd_steinberg_dither(img)
# Pack to 1-bit. PIL mode "1" tobytes() uses 0-bit=black, 1-bit=white,
# but the printer wants 1-bit=black. Mapping dark->1 via point() inverts
# the PIL convention so the final packed bits match what the printer needs.
img = img.point(lambda x: 1 if x < 128 else 0, "1")
return img
def image_to_raster(img: Image.Image) -> bytes:
"""Pack 1-bit image into raw raster bytes, MSB first."""
if img.mode != "1":
raise ValueError(f"Expected mode '1', got '{img.mode}'")
if img.width != PRINTHEAD_PX:
raise ValueError(f"Expected width {PRINTHEAD_PX}, got {img.width}")
return img.tobytes()
def text_to_image(text: str, font_size: int = 30, label_height: int = 240) -> Image.Image:
"""Render crisp 1-bit text, rotated 90 degrees for label printing."""
canvas_w = label_height
canvas_h = PRINTHEAD_PX
img = Image.new("L", (canvas_w, canvas_h), 255)
draw = ImageDraw.Draw(img)
draw.fontmode = "1" # disable antialiasing - pure 1-bit glyph rendering
font = ImageFont.load_default(size=font_size)
bbox = draw.textbbox((0, 0), text, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
x = (canvas_w - tw) // 2 - bbox[0]
y = (canvas_h - th) // 2 - bbox[1]
draw.text((x, y), text, fill=0, font=font)
img = img.rotate(90, expand=True)
return img

View File

@@ -0,0 +1,380 @@
"""
Fichero / D11s thermal label printer - BLE + Classic Bluetooth interface.
Protocol reverse-engineered from decompiled Fichero APK (com.lj.fichero).
Device class: AiYinNormalDevice (LuckPrinter SDK)
96px wide printhead (12 bytes/row), 203 DPI, prints 1-bit raster images.
"""
import asyncio
import sys
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from bleak import BleakClient, BleakGATTCharacteristic, BleakScanner
# --- RFCOMM (Classic Bluetooth) support - Linux + Windows (Python 3.9+) ---
_RFCOMM_AVAILABLE = False
if sys.platform in ("linux", "win32"):
import socket as _socket
_RFCOMM_AVAILABLE = hasattr(_socket, "AF_BLUETOOTH")
RFCOMM_CHANNEL = 1
# --- BLE identifiers ---
PRINTER_NAME_PREFIXES = ("FICHERO", "D11s_")
# Using the 18f0 service (any of the four BLE UART services work)
WRITE_UUID = "00002af1-0000-1000-8000-00805f9b34fb"
NOTIFY_UUID = "00002af0-0000-1000-8000-00805f9b34fb"
# --- Printhead ---
PRINTHEAD_PX = 96
BYTES_PER_ROW = PRINTHEAD_PX // 8 # 12
CHUNK_SIZE_BLE = 200 # BLE MTU-limited
CHUNK_SIZE_CLASSIC = 16384 # from decompiled app (C1703d.java), stream-based
# --- Paper types for 10 FF 84 nn ---
PAPER_GAP = 0x00
PAPER_BLACK_MARK = 0x01
PAPER_CONTINUOUS = 0x02
# --- Timing (seconds) - empirically tuned against D11s fw 2.4.6 ---
DELAY_AFTER_DENSITY = 0.10 # printer needs time to apply density setting
DELAY_COMMAND_GAP = 0.05 # minimum gap between sequential commands
DELAY_CHUNK_GAP = 0.02 # inter-chunk pacing for BLE throughput
DELAY_RASTER_SETTLE = 0.50 # wait for printhead after raster transfer
DELAY_AFTER_FEED = 0.30 # wait after form feed before stop command
DELAY_NOTIFY_EXTRA = 0.05 # extra wait for trailing BLE notification fragments
# --- Exceptions ---
class PrinterError(Exception):
"""Base exception for printer operations."""
class PrinterNotFound(PrinterError):
"""No Fichero/D11s printer found during BLE scan."""
class PrinterTimeout(PrinterError):
"""Printer did not respond within the expected time."""
class PrinterNotReady(PrinterError):
"""Printer status indicates it cannot print."""
# --- Discovery ---
async def find_printer() -> str:
"""Scan BLE for a Fichero/D11s printer. Returns the address."""
print("Scanning for printer...")
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
print(f" Found {d.name} at {d.address}")
return d.address
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
# --- Status ---
class PrinterStatus:
"""Parsed status byte from 10 FF 40."""
def __init__(self, byte: int):
self.raw = byte
self.printing = bool(byte & 0x01)
self.cover_open = bool(byte & 0x02)
self.no_paper = bool(byte & 0x04)
self.low_battery = bool(byte & 0x08)
self.overheated = bool(byte & 0x10 or byte & 0x40)
self.charging = bool(byte & 0x20)
def __str__(self) -> str:
flags = []
if self.printing:
flags.append("printing")
if self.cover_open:
flags.append("cover open")
if self.no_paper:
flags.append("no paper")
if self.low_battery:
flags.append("low battery")
if self.overheated:
flags.append("overheated")
if self.charging:
flags.append("charging")
return ", ".join(flags) if flags else "ready"
@property
def ok(self) -> bool:
return not (self.cover_open or self.no_paper or self.overheated)
# --- RFCOMM client (duck-types the BleakClient interface) ---
class RFCOMMClient:
"""Classic Bluetooth (RFCOMM) transport. Linux + Windows (Python 3.9+).
Implements the same async context manager + write_gatt_char/start_notify
interface that PrinterClient expects from BleakClient. Zero dependencies
beyond stdlib.
"""
is_classic = True # transport marker for PrinterClient chunk sizing
def __init__(self, address: str, channel: int = RFCOMM_CHANNEL):
self._address = address
self._channel = channel
self._sock: "_socket.socket | None" = None
self._reader_task: asyncio.Task | None = None
async def __aenter__(self) -> "RFCOMMClient":
if not _RFCOMM_AVAILABLE:
raise PrinterError(
"RFCOMM transport requires socket.AF_BLUETOOTH "
"(Linux with BlueZ, or Windows with Python 3.9+). "
"Not available on this platform."
)
import socket as _socket
sock = _socket.socket(
_socket.AF_BLUETOOTH, _socket.SOCK_STREAM, _socket.BTPROTO_RFCOMM
)
sock.setblocking(False)
loop = asyncio.get_running_loop()
try:
await asyncio.wait_for(
loop.sock_connect(sock, (self._address, self._channel)),
timeout=10.0,
)
except Exception:
sock.close()
raise
self._sock = sock
return self
async def __aexit__(self, *exc) -> None:
if self._reader_task is not None:
self._reader_task.cancel()
try:
await self._reader_task
except asyncio.CancelledError:
pass
self._reader_task = None
if self._sock is not None:
self._sock.close()
self._sock = None
async def write_gatt_char(self, _uuid: str, data: bytes, response: bool = False) -> None:
loop = asyncio.get_running_loop()
await loop.sock_sendall(self._sock, data)
async def start_notify(self, _uuid: str, callback) -> None:
self._reader_task = asyncio.create_task(self._reader_loop(callback))
async def _reader_loop(self, callback) -> None:
loop = asyncio.get_running_loop()
while True:
try:
data = await loop.sock_recv(self._sock, 1024)
except (OSError, asyncio.CancelledError):
return
if not data:
return
callback(None, bytearray(data))
# --- Client ---
class PrinterClient:
def __init__(self, client: BleakClient):
self.client = client
self._buf = bytearray()
self._event = asyncio.Event()
self._lock = asyncio.Lock()
self._is_classic = getattr(client, "is_classic", False)
def _on_notify(self, _char: BleakGATTCharacteristic, data: bytearray) -> None:
self._buf.extend(data)
self._event.set()
async def start(self) -> None:
await self.client.start_notify(NOTIFY_UUID, self._on_notify)
async def send(self, data: bytes, wait: bool = False, timeout: float = 2.0) -> bytes:
async with self._lock:
if wait:
self._buf.clear()
self._event.clear()
await self.client.write_gatt_char(WRITE_UUID, data, response=False)
if wait:
try:
await asyncio.wait_for(self._event.wait(), timeout=timeout)
await asyncio.sleep(DELAY_NOTIFY_EXTRA)
except asyncio.TimeoutError:
raise PrinterTimeout(f"No response within {timeout}s")
return bytes(self._buf)
async def send_chunked(self, data: bytes, chunk_size: int | None = None) -> None:
if chunk_size is None:
chunk_size = CHUNK_SIZE_CLASSIC if self._is_classic else CHUNK_SIZE_BLE
delay = 0 if self._is_classic else DELAY_CHUNK_GAP
async with self._lock:
for i in range(0, len(data), chunk_size):
chunk = data[i : i + chunk_size]
await self.client.write_gatt_char(WRITE_UUID, chunk, response=False)
if delay:
await asyncio.sleep(delay)
# --- Info commands (all tested and confirmed on D11s fw 2.4.6) ---
async def get_model(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF0]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_firmware(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF1]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_serial(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF2]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_boot_version(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xEF]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_battery(self) -> int:
r = await self.send(bytes([0x10, 0xFF, 0x50, 0xF1]), wait=True)
if r and len(r) >= 2:
return r[-1]
return -1
async def get_status(self) -> PrinterStatus:
r = await self.send(bytes([0x10, 0xFF, 0x40]), wait=True)
if r:
return PrinterStatus(r[-1])
return PrinterStatus(0xFF)
async def get_density(self) -> bytes:
r = await self.send(bytes([0x10, 0xFF, 0x11]), wait=True)
return r
async def get_shutdown_time(self) -> int:
"""Returns auto-shutdown timeout in minutes."""
r = await self.send(bytes([0x10, 0xFF, 0x13]), wait=True)
if r and len(r) >= 2:
return (r[0] << 8) | r[1]
return -1
async def get_all_info(self) -> dict:
"""10 FF 70: returns pipe-delimited string with all device info."""
r = await self.send(bytes([0x10, 0xFF, 0x70]), wait=True)
if not r:
return {}
parts = r.decode(errors="replace").split("|")
if len(parts) >= 6:
return {
"bt_name": parts[0],
"mac_classic": parts[1],
"mac_ble": parts[2],
"firmware": parts[3],
"serial": parts[4],
"battery": f"{parts[5]}%",
}
return {"raw": r.decode(errors="replace")}
# --- Config commands (tested on D11s) ---
async def set_density(self, level: int) -> bool:
"""0=light, 1=medium, 2=thick. Returns True if printer responded OK."""
r = await self.send(bytes([0x10, 0xFF, 0x10, 0x00, level]), wait=True)
return r == b"OK"
async def set_paper_type(self, paper: int = PAPER_GAP) -> bool:
"""0=gap/label, 1=black mark, 2=continuous."""
r = await self.send(bytes([0x10, 0xFF, 0x84, paper]), wait=True)
return r == b"OK"
async def set_shutdown_time(self, minutes: int) -> bool:
hi = (minutes >> 8) & 0xFF
lo = minutes & 0xFF
r = await self.send(bytes([0x10, 0xFF, 0x12, hi, lo]), wait=True)
return r == b"OK"
async def factory_reset(self) -> bool:
r = await self.send(bytes([0x10, 0xFF, 0x04]), wait=True)
return r == b"OK"
# --- Print control (AiYin-specific, from decompiled APK) ---
async def wakeup(self) -> None:
await self.send(b"\x00" * 12)
async def enable(self) -> None:
"""AiYin enable: 10 FF FE 01 (NOT 10 FF F1 03)."""
await self.send(bytes([0x10, 0xFF, 0xFE, 0x01]))
async def feed_dots(self, dots: int) -> None:
"""Feed paper forward by n dots."""
await self.send(bytes([0x1B, 0x4A, dots & 0xFF]))
async def form_feed(self) -> None:
"""Position to next label."""
await self.send(bytes([0x1D, 0x0C]))
async def stop_print(self) -> bool:
"""AiYin stop: 10 FF FE 45. Waits for 0xAA or 'OK'."""
r = await self.send(bytes([0x10, 0xFF, 0xFE, 0x45]), wait=True, timeout=60.0)
if r:
return r[0] == 0xAA or r.startswith(b"OK")
return False
async def get_info(self) -> dict:
status = await self.get_status()
return {
"model": await self.get_model(),
"firmware": await self.get_firmware(),
"boot": await self.get_boot_version(),
"serial": await self.get_serial(),
"battery": f"{await self.get_battery()}%",
"status": str(status),
"shutdown": f"{await self.get_shutdown_time()} min",
}
@asynccontextmanager
async def connect(
address: str | None = None,
classic: bool = False,
channel: int = RFCOMM_CHANNEL,
) -> AsyncGenerator[PrinterClient, None]:
"""Discover printer, connect, and yield a ready PrinterClient."""
if classic:
if not address:
raise PrinterError("--address is required for Classic Bluetooth (no scanning)")
async with RFCOMMClient(address, channel) as client:
pc = PrinterClient(client)
await pc.start()
yield pc
else:
addr = address or await find_printer()
async with BleakClient(addr) as client:
pc = PrinterClient(client)
await pc.start()
yield pc

29
fichero_printer/run.sh Normal file
View File

@@ -0,0 +1,29 @@
#!/usr/bin/with-contenv bashio
# shellcheck shell=bash
set -e
declare port
declare ble_address
declare transport
declare channel
port=$(bashio::config 'port')
transport=$(bashio::config 'transport')
channel=$(bashio::config 'channel')
# Pass connection settings to the Python module via environment variables.
# The module reads these at import time, so they must be exported before uvicorn
# imports fichero.api.
export FICHERO_TRANSPORT="${transport}"
export FICHERO_CHANNEL="${channel}"
ble_address=$(bashio::config 'ble_address')
if [ -n "${ble_address}" ]; then
export FICHERO_ADDR="${ble_address}"
bashio::log.info "Using fixed Bluetooth address: ${ble_address}"
else
bashio::log.info "No address configured will auto-scan for printer on first request."
fi
bashio::log.info "Starting Fichero Printer API on 0.0.0.0:${port} (transport: ${transport})..."
exec uvicorn fichero.api:app --host 0.0.0.0 --port "${port}"

View File

@@ -0,0 +1,13 @@
configuration:
port:
name: "API-Port"
description: "Port des REST-API-Servers. Den obigen Port-Mapping-Eintrag entsprechend anpassen."
ble_address:
name: "Bluetooth-Adresse"
description: "Feste BLE-Adresse des Druckers (z.B. AA:BB:CC:DD:EE:FF). Leer lassen für automatischen Scan."
transport:
name: "Transport"
description: "Verbindungsart: 'ble' für Bluetooth Low Energy (Standard) oder 'classic' für RFCOMM."
channel:
name: "RFCOMM-Kanal"
description: "Classic-Bluetooth-RFCOMM-Kanal. Nur relevant wenn Transport auf 'classic' gesetzt ist."