13 Commits

Author SHA1 Message Date
paul2212
48c40d9f8f 0.1.27 2026-03-16 14:17:49 +01:00
a7fabdae54 Merge pull request '0.1.25' (#6) from 0.1.25 into main
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
Reviewed-on: #6
2026-03-16 12:48:37 +00:00
paul2212
fef3d18d3f 0.1.25 2026-03-16 13:48:12 +01:00
66c3f06b48 Merge pull request 'refactor' (#5) from 0.1.23 into main
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
Reviewed-on: #5
2026-03-16 11:44:45 +00:00
paul2212
7843a38407 refactor 2026-03-16 12:43:26 +01:00
eee58431ab update version
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
2026-03-16 09:26:06 +00:00
40a1f78f55 Merge pull request 'release/0.1.20' (#4) from release/0.1.20 into main
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
Reviewed-on: #4
2026-03-16 09:21:35 +00:00
paul2212
16886bfa21 add files 2026-03-16 10:16:12 +01:00
paul2212
8520a88197 refactor: Externalize web UI to index.html
Refactors the embedded web UI in the API server to be loaded from a
separate index.html file instead of a large inline string.

This improves maintainability by separating the presentation layer
(HTML/CSS/JS) from the backend Python logic.
2026-03-16 10:15:22 +01:00
paul2212
1a51ebb122 Retry BLE with fresh LE scan on br-connection-not-supported (0.1.15)
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
2026-03-07 22:56:13 +01:00
paul2212
92a7224774 Avoid raw MAC BLE fallback and bump to 0.1.14
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
2026-03-07 22:50:57 +01:00
9f191b564a Bump version to 0.1.13 in API and package.json
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
2026-03-07 15:19:03 +01:00
paul2212
42e56e1b9f Retry BLE service-discovery disconnect errors and bump to 0.1.13
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
2026-03-07 15:12:56 +01:00
18 changed files with 766 additions and 1516 deletions

View File

@@ -4,6 +4,94 @@ All notable changes to this project are documented in this file.
The format is based on Keep a Changelog and this project uses Semantic Versioning.
## [0.1.27] - 2026-03-16
### Changed
- **Project Structure**: Moved the `fichero` library into `fichero_printer/` to make the add-on self-contained. This simplifies the build process and removes the need for synchronization scripts.
- Fixed invalid duplicate `version` keys in `pyproject.toml` and `config.yaml`.
## [0.1.26] - 2026-03-16
### Fixed
- **Build Process**: Fixed `too many links` Docker build error by removing the symlink-based approach. Introduced a `sync_addon.sh` script to automate copying the library into the add-on directory, which is required for the Home Assistant build system.
## [0.1.25] - 2026-03-08
### Changed
- **Build Process**: Replaced the manually copied `fichero` directory inside the Home Assistant add-on with a symbolic link. This eliminates code duplication and automates synchronization, simplifying the build process.
## [0.1.24] - 2026-03-08
### Fixed
- **Home Assistant Build**: Reverted the add-on's `Dockerfile` to a vendored code approach to resolve build failures caused by the Home Assistant build system's inability to access files outside the add-on directory. The add-on is now self-contained again.
## [0.1.23] - 2026-03-08
### Changed
- Updated the Home Assistant add-on's `Dockerfile` to install the main library as a package, completing the project structure refactoring.
- Added `python-multipart` as an explicit dependency for the API server.
## [0.1.22] - 2026-03-08
### Changed
- **Refactored Project Structure**: Eliminated duplicated code by converting the project into an installable Python package. The Home Assistant add-on now installs the main library as a dependency instead of using a vendored copy, improving maintainability and preventing sync issues.
## [0.1.21] - 2026-03-08
### Fixed
- Synchronized the Home Assistant add-on's source code (`fichero_printer/fichero/`) with the main library to fix stale code and version mismatch issues.
## [0.1.20] - 2026-03-08
### Changed
- Refactored the embedded web UI in the API server to be loaded from a separate `index.html` file instead of a large inline string, improving maintainability.
## [0.1.19] - 2026-03-08
### Added
- Added `POST /unpair` endpoint and "Unpair Device" button in the web UI to remove a Bluetooth device from the host's paired devices.
## [0.1.18] - 2026-03-08
### Added
- Added `POST /pair` endpoint and "Pair Device" button in the web UI to easily pair/trust the printer via `bluetoothctl` for Classic Bluetooth connections.
## [0.1.17] - 2026-03-08
### Added
- Added automatic fallback to BLE connection if Classic Bluetooth (RFCOMM) fails with `[Errno 12] Out of memory`, a common issue on Linux with stale device states.
## [0.1.16] - 2026-03-08
### Fixed
- Corrected typos in the Code128B bit pattern table for characters '$' (ASCII 36) and ')' (ASCII 41), which caused incorrect barcodes to be generated.
## [0.1.15] - 2026-03-07
### Fixed
- Added BLE recovery path for `br-connection-not-supported`: the connector now forces a fresh LE scan target resolution and retries before returning an error.
## [0.1.14] - 2026-03-07
### Fixed
- Removed BLE fallback to raw MAC string when device resolution fails. The connector now requires a discovered LE device object, avoiding BlueZ BR/EDR misclassification that can cause `br-connection-not-supported`.
## [0.1.13] - 2026-03-07
### Fixed
- Treated BLE service-discovery disconnects (`failed to discover services, device disconnected`) as retryable transient errors in the BLE connect loop.
## [0.1.12] - 2026-03-07
### Fixed

View File

@@ -140,6 +140,18 @@ asyncio.run(main())
The package exports `PrinterClient`, `connect`, `PrinterError`, `PrinterNotFound`, `PrinterTimeout`, `PrinterNotReady`, and `PrinterStatus`.
## Troubleshooting
### Classic Bluetooth: [Errno 12] Out of memory
If you encounter `[Errno 12] Out of memory` failures on Classic Bluetooth connections, it typically implies a stale state in the BlueZ stack or the printer's radio. As of v0.1.17, the library automatically falls back to a BLE connection when this specific error occurs.
If you wish to resolve the underlying Classic Bluetooth issue, these steps can help:
- **Power cycle the printer**: This clears the printer's radio state and is often the only fix if the device is rejecting RFCOMM.
- **Verify Pairing**: Classic Bluetooth (RFCOMM) requires the device to be paired and trusted in the OS. You can use the "Pair Device" or "Unpair Device" buttons in the Home Assistant add-on's web UI, or run `bluetoothctl pair <MAC>` and `bluetoothctl trust <MAC>` (or `bluetoothctl remove <MAC>`) on the host. Pairing is not required for BLE.
- **Restart Bluetooth**: `systemctl restart bluetooth` on the host can clear stuck socket handles.
## TODO
- [ ] Emoji support in text labels. The default Pillow font has no emoji glyphs, so they render as squares. Needs two-pass rendering: split text into emoji/non-emoji segments, render emoji with Apple Color Emoji (macOS) or Noto Color Emoji (Linux) using `embedded_color=True`, then composite onto the label.

View File

@@ -1,25 +0,0 @@
"""Fichero D11s thermal label printer - BLE + Classic Bluetooth interface."""
from fichero.printer import (
RFCOMM_CHANNEL,
PrinterClient,
PrinterError,
PrinterNotFound,
PrinterNotReady,
PrinterStatus,
PrinterTimeout,
RFCOMMClient,
connect,
)
__all__ = [
"RFCOMM_CHANNEL",
"PrinterClient",
"PrinterError",
"PrinterNotFound",
"PrinterNotReady",
"PrinterStatus",
"PrinterTimeout",
"RFCOMMClient",
"connect",
]

View File

@@ -5,6 +5,7 @@ Start with:
or:
python -m fichero.api
Endpoints:
GET /status Printer status
GET /info Printer info (model, firmware, battery, …)
@@ -17,8 +18,10 @@ from __future__ import annotations
import argparse
import asyncio
import io
import re
import os
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Annotated
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
@@ -72,7 +75,7 @@ async def lifespan(app: FastAPI): # noqa: ARG001
app = FastAPI(
title="Fichero Printer API",
description="REST API for the Fichero D11s (AiYin) thermal label printer.",
version="0.1.9",
version="0.1.27",
lifespan=lifespan,
docs_url=None,
redoc_url=None,
@@ -94,301 +97,20 @@ def _address(address: str | None) -> str | None:
def _ui_html() -> str:
default_address = _DEFAULT_ADDRESS or ""
default_transport = "classic" if _DEFAULT_CLASSIC else "ble"
return f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Fichero Printer</title>
<style>
:root {{
--bg: #f4efe6;
--panel: #fffaf2;
--line: #d8cdbd;
--ink: #2d241d;
--muted: #6c6258;
--accent: #b55e33;
--accent-2: #245b4b;
}}
* {{ box-sizing: border-box; }}
body {{
margin: 0;
font-family: "Noto Sans", system-ui, sans-serif;
color: var(--ink);
background:
radial-gradient(circle at top left, #fff8ed 0, transparent 35%),
linear-gradient(180deg, #efe4d3 0%, var(--bg) 100%);
}}
main {{
max-width: 980px;
margin: 0 auto;
padding: 24px 16px 40px;
}}
.hero {{
margin-bottom: 20px;
padding: 24px;
border: 1px solid var(--line);
border-radius: 18px;
background: rgba(255, 250, 242, 0.92);
backdrop-filter: blur(4px);
}}
h1, h2 {{ margin: 0 0 12px; }}
.muted {{ color: var(--muted); }}
.grid {{
display: grid;
gap: 16px;
grid-template-columns: repeat(auto-fit, minmax(280px, 1fr));
}}
.card {{
padding: 18px;
border: 1px solid var(--line);
border-radius: 16px;
background: var(--panel);
box-shadow: 0 8px 24px rgba(45, 36, 29, 0.06);
}}
label {{
display: block;
margin: 10px 0 6px;
font-size: 0.92rem;
font-weight: 600;
}}
input, select, textarea, button {{
width: 100%;
border-radius: 10px;
border: 1px solid var(--line);
padding: 10px 12px;
font: inherit;
}}
textarea {{ min-height: 110px; resize: vertical; }}
.row {{
display: grid;
gap: 12px;
grid-template-columns: repeat(2, minmax(0, 1fr));
}}
.inline {{
display: flex;
gap: 10px;
align-items: center;
margin-top: 12px;
}}
.inline input[type="checkbox"] {{ width: auto; }}
button {{
cursor: pointer;
font-weight: 700;
color: #fff;
background: var(--accent);
border: none;
}}
button.alt {{ background: var(--accent-2); }}
pre {{
overflow: auto;
margin: 0;
padding: 12px;
border-radius: 12px;
background: #241f1a;
color: #f7efe4;
min-height: 140px;
}}
.actions {{
display: flex;
gap: 10px;
flex-wrap: wrap;
margin-top: 12px;
}}
.actions button {{
width: auto;
min-width: 140px;
}}
@media (max-width: 640px) {{
.row {{ grid-template-columns: 1fr; }}
.actions button {{ width: 100%; }}
}}
</style>
</head>
<body>
<main>
<section class="hero">
<h1>Fichero Printer</h1>
<p class="muted">Home Assistant print console for status, text labels, and image uploads.</p>
<p class="muted">API docs remain available at <a href="docs">/docs</a>.</p>
</section>
<section class="grid">
<div class="card">
<h2>Connection</h2>
<label for="address">Printer address</label>
<input id="address" value="{default_address}" placeholder="C9:48:8A:69:D5:C0">
try:
template_path = Path(__file__).parent / "index.html"
template = template_path.read_text(encoding="utf-8")
except FileNotFoundError:
return "<h1>Error: index.html not found</h1>"
<div class="row">
<div>
<label for="transport">Transport</label>
<select id="transport">
<option value="ble"{" selected" if default_transport == "ble" else ""}>BLE</option>
<option value="classic"{" selected" if default_transport == "classic" else ""}>Classic</option>
</select>
</div>
<div>
<label for="channel">RFCOMM channel</label>
<input id="channel" type="number" min="1" max="30" value="{_DEFAULT_CHANNEL}">
</div>
</div>
<div class="actions">
<button type="button" class="alt" onclick="runGet('status')">Get Status</button>
<button type="button" class="alt" onclick="runGet('info')">Get Info</button>
</div>
</div>
<div class="card">
<h2>Output</h2>
<pre id="output">Ready.</pre>
</div>
<div class="card">
<h2>Print Text</h2>
<label for="text">Text</label>
<textarea id="text" placeholder="Hello from Home Assistant"></textarea>
<div class="row">
<div>
<label for="text_density">Density</label>
<select id="text_density">
<option value="0">0</option>
<option value="1">1</option>
<option value="2" selected>2</option>
</select>
</div>
<div>
<label for="text_copies">Copies</label>
<input id="text_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="row">
<div>
<label for="text_font_size">Font size</label>
<input id="text_font_size" type="number" min="6" max="200" value="30">
</div>
<div>
<label for="text_label_length">Label length (mm)</label>
<input id="text_label_length" type="number" min="5" max="500" value="30">
</div>
</div>
<label for="text_paper">Paper</label>
<select id="text_paper">
<option value="gap" selected>gap</option>
<option value="black">black</option>
<option value="continuous">continuous</option>
</select>
<div class="actions">
<button type="button" onclick="printText()">Print Text</button>
</div>
</div>
<div class="card">
<h2>Print Image</h2>
<label for="image_file">Image file</label>
<input id="image_file" type="file" accept="image/*">
<div class="row">
<div>
<label for="image_density">Density</label>
<select id="image_density">
<option value="0">0</option>
<option value="1">1</option>
<option value="2" selected>2</option>
</select>
</div>
<div>
<label for="image_copies">Copies</label>
<input id="image_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="row">
<div>
<label for="image_label_length">Label length (mm)</label>
<input id="image_label_length" type="number" min="5" max="500" value="30">
</div>
<div class="inline">
<input id="image_dither" type="checkbox" checked>
<label for="image_dither">Enable dithering</label>
</div>
</div>
<label for="image_paper">Paper</label>
<select id="image_paper">
<option value="gap" selected>gap</option>
<option value="black">black</option>
<option value="continuous">continuous</option>
</select>
<div class="actions">
<button type="button" onclick="printImage()">Print Image</button>
</div>
</div>
</section>
</main>
<script>
function commonParams() {{
const address = document.getElementById("address").value.trim();
const classic = document.getElementById("transport").value === "classic";
const channel = document.getElementById("channel").value;
const params = new URLSearchParams();
if (address) params.set("address", address);
params.set("classic", String(classic));
params.set("channel", channel);
return params;
}}
async function showResponse(response) {{
const output = document.getElementById("output");
let data;
try {{
data = await response.json();
}} catch {{
data = {{ detail: await response.text() }};
}}
output.textContent = JSON.stringify({{ status: response.status, ok: response.ok, data }}, null, 2);
}}
async function runGet(path) {{
const response = await fetch(`${{path}}?${{commonParams().toString()}}`);
await showResponse(response);
}}
async function printText() {{
const form = new FormData();
form.set("text", document.getElementById("text").value);
form.set("density", document.getElementById("text_density").value);
form.set("copies", document.getElementById("text_copies").value);
form.set("font_size", document.getElementById("text_font_size").value);
form.set("label_length", document.getElementById("text_label_length").value);
form.set("paper", document.getElementById("text_paper").value);
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
const response = await fetch("print/text", {{ method: "POST", body: form }});
await showResponse(response);
}}
async function printImage() {{
const fileInput = document.getElementById("image_file");
if (!fileInput.files.length) {{
document.getElementById("output").textContent = "Select an image file first.";
return;
}}
const form = new FormData();
form.set("file", fileInput.files[0]);
form.set("density", document.getElementById("image_density").value);
form.set("copies", document.getElementById("image_copies").value);
form.set("label_length", document.getElementById("image_label_length").value);
form.set("paper", document.getElementById("image_paper").value);
form.set("dither", String(document.getElementById("image_dither").checked));
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
const response = await fetch("print/image", {{ method: "POST", body: form }});
await showResponse(response);
}}
</script>
</body>
</html>"""
# Simple substitution for initial values
return (
template.replace("{default_address}", default_address)
.replace("{ble_selected}", " selected" if default_transport == "ble" else "")
.replace("{classic_selected}", " selected" if default_transport == "classic" else "")
.replace("{default_channel}", str(_DEFAULT_CHANNEL))
)
# ---------------------------------------------------------------------------
@@ -468,6 +190,90 @@ async def get_info(
return info
@app.post(
"/pair",
summary="Pair and trust a Bluetooth device",
status_code=200,
)
async def pair_device(
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
):
"""
Attempt to pair and trust the device using `bluetoothctl`.
This is intended for setting up Classic Bluetooth connections.
"""
addr = _address(address)
if not addr:
raise HTTPException(status_code=422, detail="Address is required to pair.")
# Basic validation for MAC address format to mitigate injection risk.
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
cmd = f'echo -e "pair {addr}\\ntrust {addr}\\nquit" | bluetoothctl'
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
except FileNotFoundError:
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
output = stdout.decode(errors="ignore")
error = stderr.decode(errors="ignore")
if "Failed to pair" in output or "not available" in output.lower():
raise HTTPException(status_code=502, detail=f"Pairing failed. Output: {output}. Error: {error}")
return {"ok": True, "message": "Pair/trust command sent. Check output for details.", "output": output, "error": error}
@app.post(
"/unpair",
summary="Unpair a Bluetooth device",
status_code=200,
)
async def unpair_device(
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
):
"""
Attempt to unpair the device using `bluetoothctl`.
"""
addr = _address(address)
if not addr:
raise HTTPException(status_code=422, detail="Address is required to unpair.")
# Basic validation for MAC address format to mitigate injection risk.
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
cmd = f'echo -e "remove {addr}\\nquit" | bluetoothctl'
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
except FileNotFoundError:
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
output = stdout.decode(errors="ignore")
error = stderr.decode(errors="ignore")
if "Failed to remove" in output or "not available" in output.lower():
raise HTTPException(status_code=502, detail=f"Unpairing failed. Output: {output}. Error: {error}")
return {"ok": True, "message": "Unpair command sent. Check output for details.", "output": output, "error": error}
@app.post(
"/print/text",
summary="Print a text label",

View File

@@ -1,251 +0,0 @@
"""CLI for Fichero D11s thermal label printer."""
import argparse
import asyncio
import os
import sys
from PIL import Image
from fichero.imaging import image_to_raster, prepare_image, text_to_image
from fichero.printer import (
BYTES_PER_ROW,
DELAY_AFTER_DENSITY,
DELAY_AFTER_FEED,
DELAY_COMMAND_GAP,
DELAY_RASTER_SETTLE,
PAPER_GAP,
PrinterClient,
PrinterError,
PrinterNotReady,
connect,
)
DOTS_PER_MM = 8 # 203 DPI
def _resolve_label_height(args: argparse.Namespace) -> int:
"""Return label height in pixels from --label-length (mm) or --label-height (px)."""
if args.label_length is not None:
return args.label_length * DOTS_PER_MM
return args.label_height
async def do_print(
pc: PrinterClient,
img: Image.Image,
density: int = 1,
paper: int = PAPER_GAP,
copies: int = 1,
dither: bool = True,
max_rows: int = 240,
) -> bool:
img = prepare_image(img, max_rows=max_rows, dither=dither)
rows = img.height
raster = image_to_raster(img)
print(f" Image: {img.width}x{rows}, {len(raster)} bytes, {copies} copies")
await pc.set_density(density)
await asyncio.sleep(DELAY_AFTER_DENSITY)
for copy_num in range(copies):
if copies > 1:
print(f" Copy {copy_num + 1}/{copies}...")
# Check status before each copy (matches decompiled app behaviour)
status = await pc.get_status()
if not status.ok:
raise PrinterNotReady(f"Printer not ready: {status}")
# AiYin print sequence (from decompiled APK)
await pc.set_paper_type(paper)
await asyncio.sleep(DELAY_COMMAND_GAP)
await pc.wakeup()
await asyncio.sleep(DELAY_COMMAND_GAP)
await pc.enable()
await asyncio.sleep(DELAY_COMMAND_GAP)
# Raster image: GS v 0 m xL xH yL yH <data>
yl = rows & 0xFF
yh = (rows >> 8) & 0xFF
header = bytes([0x1D, 0x76, 0x30, 0x00, BYTES_PER_ROW, 0x00, yl, yh])
await pc.send_chunked(header + raster)
await asyncio.sleep(DELAY_RASTER_SETTLE)
await pc.form_feed()
await asyncio.sleep(DELAY_AFTER_FEED)
ok = await pc.stop_print()
if not ok:
print(" WARNING: no OK/0xAA from stop command")
return True
async def cmd_info(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
info = await pc.get_info()
for k, v in info.items():
print(f" {k}: {v}")
print()
all_info = await pc.get_all_info()
for k, v in all_info.items():
print(f" {k}: {v}")
async def cmd_status(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
status = await pc.get_status()
print(f" Status: {status}")
print(f" Raw: 0x{status.raw:02X} ({status.raw:08b})")
print(f" printing={status.printing} cover_open={status.cover_open} "
f"no_paper={status.no_paper} low_battery={status.low_battery} "
f"overheated={status.overheated} charging={status.charging}")
async def cmd_text(args: argparse.Namespace) -> None:
text = " ".join(args.text)
label_h = _resolve_label_height(args)
img = text_to_image(text, font_size=args.font_size, label_height=label_h)
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
print(f'Printing "{text}"...')
ok = await do_print(pc, img, args.density, paper=args.paper,
copies=args.copies, dither=False, max_rows=label_h)
print("Done." if ok else "FAILED.")
async def cmd_image(args: argparse.Namespace) -> None:
img = Image.open(args.path)
label_h = _resolve_label_height(args)
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
print(f"Printing {args.path}...")
ok = await do_print(pc, img, args.density, paper=args.paper,
copies=args.copies, dither=not args.no_dither,
max_rows=label_h)
print("Done." if ok else "FAILED.")
async def cmd_set(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
if args.setting == "density":
val = int(args.value)
if not 0 <= val <= 2:
print(" ERROR: density must be 0, 1, or 2")
return
ok = await pc.set_density(val)
print(f" Set density={args.value}: {'OK' if ok else 'FAILED'}")
elif args.setting == "shutdown":
val = int(args.value)
if not 1 <= val <= 480:
print(" ERROR: shutdown must be 1-480 minutes")
return
ok = await pc.set_shutdown_time(val)
print(f" Set shutdown={args.value}min: {'OK' if ok else 'FAILED'}")
elif args.setting == "paper":
types = {"gap": 0, "black": 1, "continuous": 2}
if args.value in types:
val = types[args.value]
else:
try:
val = int(args.value)
except ValueError:
print(" ERROR: paper must be gap, black, continuous, or 0-2")
return
if not 0 <= val <= 2:
print(" ERROR: paper must be gap, black, continuous, or 0-2")
return
ok = await pc.set_paper_type(val)
print(f" Set paper={args.value}: {'OK' if ok else 'FAILED'}")
def _add_paper_arg(parser: argparse.ArgumentParser) -> None:
"""Add --paper argument to a subparser."""
parser.add_argument(
"--paper", type=str, default="gap",
help="Paper type: gap (default), black, continuous",
)
def _parse_paper(value: str) -> int:
"""Convert paper string/int to protocol value."""
types = {"gap": 0, "black": 1, "continuous": 2}
if value in types:
return types[value]
try:
val = int(value)
if 0 <= val <= 2:
return val
except ValueError:
pass
print(f" WARNING: unknown paper type '{value}', using gap")
return 0
def main() -> None:
parser = argparse.ArgumentParser(description="Fichero D11s Label Printer")
parser.add_argument("--address", default=os.environ.get("FICHERO_ADDR"),
help="BLE address (skip scanning, or set FICHERO_ADDR)")
parser.add_argument("--classic", action="store_true",
default=os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic",
help="Use Classic Bluetooth (RFCOMM) instead of BLE (Linux only, "
"or set FICHERO_TRANSPORT=classic)")
parser.add_argument("--channel", type=int, default=1,
help="RFCOMM channel (default: 1, only used with --classic)")
sub = parser.add_subparsers(dest="command", required=True)
p_info = sub.add_parser("info", help="Show device info")
p_info.set_defaults(func=cmd_info)
p_status = sub.add_parser("status", help="Show detailed status")
p_status.set_defaults(func=cmd_status)
p_text = sub.add_parser("text", help="Print text label")
p_text.add_argument("text", nargs="+", help="Text to print")
p_text.add_argument("--density", type=int, default=2, choices=[0, 1, 2],
help="Print density: 0=light, 1=medium, 2=thick")
p_text.add_argument("--copies", type=int, default=1, help="Number of copies")
p_text.add_argument("--font-size", type=int, default=30, help="Font size in points")
p_text.add_argument("--label-length", type=int, default=None,
help="Label length in mm (default: 30mm)")
p_text.add_argument("--label-height", type=int, default=240,
help="Label height in pixels (default: 240, prefer --label-length)")
_add_paper_arg(p_text)
p_text.set_defaults(func=cmd_text)
p_image = sub.add_parser("image", help="Print image file")
p_image.add_argument("path", help="Path to image file")
p_image.add_argument("--density", type=int, default=2, choices=[0, 1, 2],
help="Print density: 0=light, 1=medium, 2=thick")
p_image.add_argument("--copies", type=int, default=1, help="Number of copies")
p_image.add_argument("--no-dither", action="store_true",
help="Disable Floyd-Steinberg dithering (use simple threshold)")
p_image.add_argument("--label-length", type=int, default=None,
help="Label length in mm (default: 30mm)")
p_image.add_argument("--label-height", type=int, default=240,
help="Max image height in pixels (default: 240, prefer --label-length)")
_add_paper_arg(p_image)
p_image.set_defaults(func=cmd_image)
p_set = sub.add_parser("set", help="Change printer settings")
p_set.add_argument("setting", choices=["density", "shutdown", "paper"],
help="Setting to change")
p_set.add_argument("value", help="New value")
p_set.set_defaults(func=cmd_set)
args = parser.parse_args()
# Resolve --paper string to int for print commands
if hasattr(args, "paper") and isinstance(args.paper, str):
args.paper = _parse_paper(args.paper)
try:
asyncio.run(args.func(args))
except PrinterError as e:
print(f" ERROR: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,97 +0,0 @@
"""Image processing for Fichero D11s thermal label printer."""
import logging
import numpy as np
from PIL import Image, ImageDraw, ImageFont, ImageOps
from fichero.printer import PRINTHEAD_PX
log = logging.getLogger(__name__)
def floyd_steinberg_dither(img: Image.Image) -> Image.Image:
"""Floyd-Steinberg error-diffusion dithering to 1-bit.
Same algorithm as PrinterImageProcessor.ditherFloydSteinberg() in the
decompiled Fichero APK: distributes quantisation error to neighbouring
pixels with weights 7/16, 3/16, 5/16, 1/16.
"""
arr = np.array(img, dtype=np.float32)
h, w = arr.shape
for y in range(h):
for x in range(w):
old = arr[y, x]
new = 0.0 if old < 128 else 255.0
arr[y, x] = new
err = old - new
if x + 1 < w:
arr[y, x + 1] += err * 7 / 16
if y + 1 < h:
if x - 1 >= 0:
arr[y + 1, x - 1] += err * 3 / 16
arr[y + 1, x] += err * 5 / 16
if x + 1 < w:
arr[y + 1, x + 1] += err * 1 / 16
arr = np.clip(arr, 0, 255).astype(np.uint8)
return Image.fromarray(arr, mode="L")
def prepare_image(
img: Image.Image, max_rows: int = 240, dither: bool = True
) -> Image.Image:
"""Convert any image to 96px wide, 1-bit, black on white.
When *dither* is True (default), uses Floyd-Steinberg error diffusion
for better quality on photos and gradients. Set False for crisp text.
"""
img = img.convert("L")
w, h = img.size
new_h = int(h * (PRINTHEAD_PX / w))
img = img.resize((PRINTHEAD_PX, new_h), Image.LANCZOS)
if new_h > max_rows:
log.warning("Image height %dpx exceeds max %dpx, cropping bottom", new_h, max_rows)
img = img.crop((0, 0, PRINTHEAD_PX, max_rows))
img = ImageOps.autocontrast(img, cutoff=1)
if dither:
img = floyd_steinberg_dither(img)
# Pack to 1-bit. PIL mode "1" tobytes() uses 0-bit=black, 1-bit=white,
# but the printer wants 1-bit=black. Mapping dark->1 via point() inverts
# the PIL convention so the final packed bits match what the printer needs.
img = img.point(lambda x: 1 if x < 128 else 0, "1")
return img
def image_to_raster(img: Image.Image) -> bytes:
"""Pack 1-bit image into raw raster bytes, MSB first."""
if img.mode != "1":
raise ValueError(f"Expected mode '1', got '{img.mode}'")
if img.width != PRINTHEAD_PX:
raise ValueError(f"Expected width {PRINTHEAD_PX}, got {img.width}")
return img.tobytes()
def text_to_image(text: str, font_size: int = 30, label_height: int = 240) -> Image.Image:
"""Render crisp 1-bit text, rotated 90 degrees for label printing."""
canvas_w = label_height
canvas_h = PRINTHEAD_PX
img = Image.new("L", (canvas_w, canvas_h), 255)
draw = ImageDraw.Draw(img)
draw.fontmode = "1" # disable antialiasing - pure 1-bit glyph rendering
font = ImageFont.load_default(size=font_size)
bbox = draw.textbbox((0, 0), text, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
x = (canvas_w - tw) // 2 - bbox[0]
y = (canvas_h - th) // 2 - bbox[1]
draw.text((x, y), text, fill=0, font=font)
img = img.rotate(90, expand=True)
return img

View File

@@ -1,467 +0,0 @@
"""
Fichero / D11s thermal label printer - BLE + Classic Bluetooth interface.
Protocol reverse-engineered from decompiled Fichero APK (com.lj.fichero).
Device class: AiYinNormalDevice (LuckPrinter SDK)
96px wide printhead (12 bytes/row), 203 DPI, prints 1-bit raster images.
"""
import asyncio
import sys
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from bleak import BleakClient, BleakGATTCharacteristic, BleakScanner
from bleak.exc import BleakDBusError, BleakError
# --- RFCOMM (Classic Bluetooth) support - Linux + Windows (Python 3.9+) ---
_RFCOMM_AVAILABLE = False
if sys.platform in ("linux", "win32"):
import socket as _socket
_RFCOMM_AVAILABLE = hasattr(_socket, "AF_BLUETOOTH")
RFCOMM_CHANNEL = 1
# --- BLE identifiers ---
PRINTER_NAME_PREFIXES = ("FICHERO", "D11s_")
# Using the 18f0 service (any of the four BLE UART services work)
WRITE_UUID = "00002af1-0000-1000-8000-00805f9b34fb"
NOTIFY_UUID = "00002af0-0000-1000-8000-00805f9b34fb"
# --- Printhead ---
PRINTHEAD_PX = 96
BYTES_PER_ROW = PRINTHEAD_PX // 8 # 12
CHUNK_SIZE_BLE = 200 # BLE MTU-limited
CHUNK_SIZE_CLASSIC = 16384 # from decompiled app (C1703d.java), stream-based
# --- Paper types for 10 FF 84 nn ---
PAPER_GAP = 0x00
PAPER_BLACK_MARK = 0x01
PAPER_CONTINUOUS = 0x02
# --- Timing (seconds) - empirically tuned against D11s fw 2.4.6 ---
DELAY_AFTER_DENSITY = 0.10 # printer needs time to apply density setting
DELAY_COMMAND_GAP = 0.05 # minimum gap between sequential commands
DELAY_CHUNK_GAP = 0.02 # inter-chunk pacing for BLE throughput
DELAY_RASTER_SETTLE = 0.50 # wait for printhead after raster transfer
DELAY_AFTER_FEED = 0.30 # wait after form feed before stop command
DELAY_NOTIFY_EXTRA = 0.05 # extra wait for trailing BLE notification fragments
BLE_CONNECT_RETRIES = 3 # retry transient BLE connect failures
BLE_CONNECT_BACKOFF = 0.7 # base backoff in seconds (linear: n * base)
# --- Exceptions ---
class PrinterError(Exception):
"""Base exception for printer operations."""
class PrinterNotFound(PrinterError):
"""No Fichero/D11s printer found during BLE scan."""
class PrinterTimeout(PrinterError):
"""Printer did not respond within the expected time."""
class PrinterNotReady(PrinterError):
"""Printer status indicates it cannot print."""
# --- Discovery ---
async def find_printer() -> str:
"""Scan BLE for a Fichero/D11s printer. Returns the address."""
print("Scanning for printer...")
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
print(f" Found {d.name} at {d.address}")
return d.address
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
async def resolve_ble_target(address: str | None = None):
"""Resolve a BLE target as Bleak device object when possible.
Passing a discovered device object to BleakClient helps BlueZ keep the
correct LE context for dual-mode environments.
"""
if address:
device = await BleakScanner.find_device_by_address(address, timeout=8.0)
return device or address
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
print(f" Found {d.name} at {d.address}")
return d
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
# --- Status ---
class PrinterStatus:
"""Parsed status byte from 10 FF 40."""
def __init__(self, byte: int):
self.raw = byte
self.printing = bool(byte & 0x01)
self.cover_open = bool(byte & 0x02)
self.no_paper = bool(byte & 0x04)
self.low_battery = bool(byte & 0x08)
self.overheated = bool(byte & 0x10 or byte & 0x40)
self.charging = bool(byte & 0x20)
def __str__(self) -> str:
flags = []
if self.printing:
flags.append("printing")
if self.cover_open:
flags.append("cover open")
if self.no_paper:
flags.append("no paper")
if self.low_battery:
flags.append("low battery")
if self.overheated:
flags.append("overheated")
if self.charging:
flags.append("charging")
return ", ".join(flags) if flags else "ready"
@property
def ok(self) -> bool:
return not (self.cover_open or self.no_paper or self.overheated)
# --- RFCOMM client (duck-types the BleakClient interface) ---
class RFCOMMClient:
"""Classic Bluetooth (RFCOMM) transport. Linux + Windows (Python 3.9+).
Implements the same async context manager + write_gatt_char/start_notify
interface that PrinterClient expects from BleakClient. Zero dependencies
beyond stdlib.
"""
is_classic = True # transport marker for PrinterClient chunk sizing
def __init__(self, address: str, channel: int = RFCOMM_CHANNEL):
self._address = address
self._channel = channel
self._sock: "_socket.socket | None" = None
self._reader_task: asyncio.Task | None = None
async def __aenter__(self) -> "RFCOMMClient":
if not _RFCOMM_AVAILABLE:
raise PrinterError(
"RFCOMM transport requires socket.AF_BLUETOOTH "
"(Linux with BlueZ, or Windows with Python 3.9+). "
"Not available on this platform."
)
import socket as _socket
sock = _socket.socket(
_socket.AF_BLUETOOTH, _socket.SOCK_STREAM, _socket.BTPROTO_RFCOMM
)
loop = asyncio.get_running_loop()
try:
# uvloop's sock_connect path goes through getaddrinfo and doesn't
# support AF_BLUETOOTH addresses reliably. Use direct socket connect
# in a thread instead.
sock.settimeout(10.0)
await loop.run_in_executor(
None,
sock.connect,
(self._address, self._channel),
)
sock.setblocking(False)
except asyncio.TimeoutError as exc:
sock.close()
raise PrinterTimeout(
f"Classic Bluetooth connection timed out to {self._address} (channel {self._channel})."
) from exc
except OSError as exc:
sock.close()
raise PrinterError(
f"Classic Bluetooth connection failed for '{self._address}' (channel {self._channel}): {exc}"
) from exc
except Exception:
sock.close()
raise
self._sock = sock
return self
async def __aexit__(self, *exc) -> None:
if self._reader_task is not None:
self._reader_task.cancel()
try:
await self._reader_task
except asyncio.CancelledError:
pass
self._reader_task = None
if self._sock is not None:
self._sock.close()
self._sock = None
async def write_gatt_char(self, _uuid: str, data: bytes, response: bool = False) -> None:
loop = asyncio.get_running_loop()
await loop.sock_sendall(self._sock, data)
async def start_notify(self, _uuid: str, callback) -> None:
self._reader_task = asyncio.create_task(self._reader_loop(callback))
async def _reader_loop(self, callback) -> None:
loop = asyncio.get_running_loop()
while True:
try:
data = await loop.sock_recv(self._sock, 1024)
except (OSError, asyncio.CancelledError):
return
if not data:
return
callback(None, bytearray(data))
# --- Client ---
class PrinterClient:
def __init__(self, client: BleakClient):
self.client = client
self._buf = bytearray()
self._event = asyncio.Event()
self._lock = asyncio.Lock()
self._is_classic = getattr(client, "is_classic", False)
def _on_notify(self, _char: BleakGATTCharacteristic, data: bytearray) -> None:
self._buf.extend(data)
self._event.set()
async def start(self) -> None:
await self.client.start_notify(NOTIFY_UUID, self._on_notify)
async def send(self, data: bytes, wait: bool = False, timeout: float = 2.0) -> bytes:
async with self._lock:
if wait:
self._buf.clear()
self._event.clear()
await self.client.write_gatt_char(WRITE_UUID, data, response=False)
if wait:
try:
await asyncio.wait_for(self._event.wait(), timeout=timeout)
await asyncio.sleep(DELAY_NOTIFY_EXTRA)
except asyncio.TimeoutError:
raise PrinterTimeout(f"No response within {timeout}s")
return bytes(self._buf)
async def send_chunked(self, data: bytes, chunk_size: int | None = None) -> None:
if chunk_size is None:
chunk_size = CHUNK_SIZE_CLASSIC if self._is_classic else CHUNK_SIZE_BLE
delay = 0 if self._is_classic else DELAY_CHUNK_GAP
async with self._lock:
for i in range(0, len(data), chunk_size):
chunk = data[i : i + chunk_size]
await self.client.write_gatt_char(WRITE_UUID, chunk, response=False)
if delay:
await asyncio.sleep(delay)
# --- Info commands (all tested and confirmed on D11s fw 2.4.6) ---
async def get_model(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF0]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_firmware(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF1]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_serial(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF2]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_boot_version(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xEF]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_battery(self) -> int:
r = await self.send(bytes([0x10, 0xFF, 0x50, 0xF1]), wait=True)
if r and len(r) >= 2:
return r[-1]
return -1
async def get_status(self) -> PrinterStatus:
r = await self.send(bytes([0x10, 0xFF, 0x40]), wait=True)
if r:
return PrinterStatus(r[-1])
return PrinterStatus(0xFF)
async def get_density(self) -> bytes:
r = await self.send(bytes([0x10, 0xFF, 0x11]), wait=True)
return r
async def get_shutdown_time(self) -> int:
"""Returns auto-shutdown timeout in minutes."""
r = await self.send(bytes([0x10, 0xFF, 0x13]), wait=True)
if r and len(r) >= 2:
return (r[0] << 8) | r[1]
return -1
async def get_all_info(self) -> dict:
"""10 FF 70: returns pipe-delimited string with all device info."""
r = await self.send(bytes([0x10, 0xFF, 0x70]), wait=True)
if not r:
return {}
parts = r.decode(errors="replace").split("|")
if len(parts) >= 6:
return {
"bt_name": parts[0],
"mac_classic": parts[1],
"mac_ble": parts[2],
"firmware": parts[3],
"serial": parts[4],
"battery": f"{parts[5]}%",
}
return {"raw": r.decode(errors="replace")}
# --- Config commands (tested on D11s) ---
async def set_density(self, level: int) -> bool:
"""0=light, 1=medium, 2=thick. Returns True if printer responded OK."""
r = await self.send(bytes([0x10, 0xFF, 0x10, 0x00, level]), wait=True)
return r == b"OK"
async def set_paper_type(self, paper: int = PAPER_GAP) -> bool:
"""0=gap/label, 1=black mark, 2=continuous."""
r = await self.send(bytes([0x10, 0xFF, 0x84, paper]), wait=True)
return r == b"OK"
async def set_shutdown_time(self, minutes: int) -> bool:
hi = (minutes >> 8) & 0xFF
lo = minutes & 0xFF
r = await self.send(bytes([0x10, 0xFF, 0x12, hi, lo]), wait=True)
return r == b"OK"
async def factory_reset(self) -> bool:
r = await self.send(bytes([0x10, 0xFF, 0x04]), wait=True)
return r == b"OK"
# --- Print control (AiYin-specific, from decompiled APK) ---
async def wakeup(self) -> None:
await self.send(b"\x00" * 12)
async def enable(self) -> None:
"""AiYin enable: 10 FF FE 01 (NOT 10 FF F1 03)."""
await self.send(bytes([0x10, 0xFF, 0xFE, 0x01]))
async def feed_dots(self, dots: int) -> None:
"""Feed paper forward by n dots."""
await self.send(bytes([0x1B, 0x4A, dots & 0xFF]))
async def form_feed(self) -> None:
"""Position to next label."""
await self.send(bytes([0x1D, 0x0C]))
async def stop_print(self) -> bool:
"""AiYin stop: 10 FF FE 45. Waits for 0xAA or 'OK'."""
r = await self.send(bytes([0x10, 0xFF, 0xFE, 0x45]), wait=True, timeout=60.0)
if r:
return r[0] == 0xAA or r.startswith(b"OK")
return False
async def get_info(self) -> dict:
status = await self.get_status()
return {
"model": await self.get_model(),
"firmware": await self.get_firmware(),
"boot": await self.get_boot_version(),
"serial": await self.get_serial(),
"battery": f"{await self.get_battery()}%",
"status": str(status),
"shutdown": f"{await self.get_shutdown_time()} min",
}
@asynccontextmanager
async def connect(
address: str | None = None,
classic: bool = False,
channel: int = RFCOMM_CHANNEL,
) -> AsyncGenerator[PrinterClient, None]:
"""Discover printer, connect, and yield a ready PrinterClient."""
if classic:
if not address:
raise PrinterError("--address is required for Classic Bluetooth (no scanning)")
# D11s variants are commonly exposed on channel 1 or 3.
candidates = [channel, 1, 2, 3]
channels = [ch for i, ch in enumerate(candidates) if ch > 0 and ch not in candidates[:i]]
last_exc: Exception | None = None
for ch in channels:
try:
async with RFCOMMClient(address, ch) as client:
pc = PrinterClient(client)
await pc.start()
yield pc
return
except (PrinterError, PrinterTimeout) as exc:
last_exc = exc
if last_exc is not None:
raise PrinterError(
f"Classic Bluetooth connection failed for '{address}'. "
f"Tried channels: {channels}. Last error: {last_exc}"
) from last_exc
raise PrinterError(f"Classic Bluetooth connection failed for '{address}'.")
else:
target = await resolve_ble_target(address)
def _is_retryable_ble_error(exc: Exception) -> bool:
msg = str(exc).lower()
return any(token in msg for token in ("timeout", "timed out", "br-connection-timeout"))
last_exc: Exception | None = None
for attempt in range(1, BLE_CONNECT_RETRIES + 1):
try:
async with BleakClient(target) as client:
pc = PrinterClient(client)
await pc.start()
yield pc
return
except asyncio.TimeoutError as exc:
last_exc = exc
if attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(f"BLE connection timed out: {exc}") from exc
except BleakDBusError as exc:
msg = str(exc).lower()
if "br-connection-not-supported" in msg:
raise PrinterError(
"BLE connection failed (br-connection-not-supported). "
"Try Classic Bluetooth with classic=true and channel=1."
) from exc
last_exc = exc
if _is_retryable_ble_error(exc) and attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(f"BLE connection failed: {exc}") from exc
except BleakError as exc:
last_exc = exc
if _is_retryable_ble_error(exc) and attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(f"BLE error: {exc}") from exc
if last_exc is not None:
raise PrinterError(
f"BLE connection failed after {BLE_CONNECT_RETRIES} attempts: {last_exc}"
) from last_exc
raise PrinterError("BLE connection failed for unknown reason.")

View File

@@ -1,5 +1,52 @@
# Changelog
## 0.1.27
- The `fichero` library is now part of the add-on directory, simplifying the build process and removing the need for synchronization scripts.
## 0.1.24
- Fixed Docker build failures by reverting to a vendored code approach. The add-on now expects the `fichero` library to be present within its directory during the build.
## 0.1.23
- Updated `Dockerfile` to install the main library via `pip` instead of copying source files, completing the refactoring to eliminate duplicated code.
## 0.1.22
- Refactored build process to install the main `fichero-printer` library as a package instead of using duplicated source files. This resolves issues with stale code.
## 0.1.21
- Fixed stale source code issue by synchronizing the add-on's internal `fichero` package with the latest library version.
## 0.1.20
- Refactored the embedded web UI to be loaded from an external `index.html` file.
## 0.1.19
- Added "Unpair Device" button to the web UI.
## 0.1.18
- Added "Pair Device" button to the web UI.
## 0.1.16
- Added automatic fallback to BLE if Classic Bluetooth fails with `[Errno 12] Out of memory`.
## 0.1.15
- Added a BLE recovery retry for `br-connection-not-supported` that forces fresh LE target resolution from scan results before failing.
## 0.1.14
- Prevented BLE fallback to raw MAC connects and now require discovered LE device resolution, reducing `br-connection-not-supported` regressions on some BlueZ hosts.
## 0.1.13
- Marked BLE service-discovery disconnect errors as retryable (`failed to discover services, device disconnected`), so the add-on retries automatically.
## 0.1.12
- Improved BLE connection target resolution by preferring discovered BLE device objects over raw MAC strings to avoid BlueZ `br-connection-not-supported` on some hosts.

View File

@@ -1,28 +1,32 @@
ARG BUILD_FROM
FROM $BUILD_FROM
# Only dbus-dev needed to talk to the HOST BlueZ via D-Bus (host_dbus: true).
# Install system dependencies.
# build-base is for compiling Python packages (numpy, pillow).
# dbus-dev is for Bleak to communicate with the host's BlueZ.
# Do NOT install bluez here - we use the host BlueZ, not our own.
RUN apk add --no-cache \
bash \
python3 \
py3-pip \
py3-numpy \
py3-pillow \
dbus-dev
dbus-dev \
build-base
# Pure-Python packages (bleak uses dbus-fast internally, no C compiler needed)
# Install Python dependencies from pip.
# We cannot use `pip install .` from pyproject.toml as it's outside the build context.
RUN pip3 install --no-cache-dir --break-system-packages \
"bleak>=0.21" \
"fastapi>=0.111" \
"uvicorn[standard]>=0.29" \
"bleak" \
"numpy" \
"Pillow" \
"fastapi" \
"uvicorn[standard]" \
"python-multipart>=0.0.9"
# Copy the fichero Python package into the container
# Copy the application code into the container.
WORKDIR /app
COPY fichero/ /app/fichero/
# Make the package importable without installation
# Make the 'fichero' package importable.
ENV PYTHONPATH=/app
# Copy startup script and normalise line endings (Windows CRLF -> LF)

View File

@@ -1,5 +1,5 @@
name: "Fichero Printer"
version: "0.1.12"
version: "0.1.27"
slug: "fichero_printer"
description: "REST API for the Fichero D11s (AiYin) thermal label printer over Bluetooth"
url: "https://git.leuschner.dev/Tobias/Fichero"

View File

@@ -5,6 +5,7 @@ Start with:
or:
python -m fichero.api
Endpoints:
GET /status Printer status
GET /info Printer info (model, firmware, battery, …)
@@ -17,8 +18,10 @@ from __future__ import annotations
import argparse
import asyncio
import io
import re
import os
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Annotated
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
@@ -35,7 +38,6 @@ from fichero.printer import (
PrinterNotFound,
PrinterTimeout,
connect,
find_printer,
)
# ---------------------------------------------------------------------------
@@ -73,7 +75,7 @@ async def lifespan(app: FastAPI): # noqa: ARG001
app = FastAPI(
title="Fichero Printer API",
description="REST API for the Fichero D11s (AiYin) thermal label printer.",
version="0.1.9",
version="0.1.26",
lifespan=lifespan,
docs_url=None,
redoc_url=None,
@@ -95,327 +97,20 @@ def _address(address: str | None) -> str | None:
def _ui_html() -> str:
default_address = _DEFAULT_ADDRESS or ""
default_transport = "classic" if _DEFAULT_CLASSIC else "ble"
return f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Fichero Printer</title>
<style>
:root {{
--bg: #f4efe6;
--panel: #fffaf2;
--line: #d8cdbd;
--ink: #2d241d;
--muted: #6c6258;
--accent: #b55e33;
--accent-2: #245b4b;
}}
* {{ box-sizing: border-box; }}
body {{
margin: 0;
font-family: "Noto Sans", system-ui, sans-serif;
color: var(--ink);
background:
radial-gradient(circle at top left, #fff8ed 0, transparent 35%),
linear-gradient(180deg, #efe4d3 0%, var(--bg) 100%);
}}
main {{
max-width: 980px;
margin: 0 auto;
padding: 24px 16px 40px;
}}
.hero {{
margin-bottom: 20px;
padding: 24px;
border: 1px solid var(--line);
border-radius: 18px;
background: rgba(255, 250, 242, 0.92);
backdrop-filter: blur(4px);
}}
h1, h2 {{ margin: 0 0 12px; }}
.muted {{ color: var(--muted); }}
.grid {{
display: grid;
gap: 16px;
grid-template-columns: repeat(auto-fit, minmax(280px, 1fr));
}}
.card {{
padding: 18px;
border: 1px solid var(--line);
border-radius: 16px;
background: var(--panel);
box-shadow: 0 8px 24px rgba(45, 36, 29, 0.06);
}}
label {{
display: block;
margin: 10px 0 6px;
font-size: 0.92rem;
font-weight: 600;
}}
input, select, textarea, button {{
width: 100%;
border-radius: 10px;
border: 1px solid var(--line);
padding: 10px 12px;
font: inherit;
}}
textarea {{ min-height: 110px; resize: vertical; }}
.row {{
display: grid;
gap: 12px;
grid-template-columns: repeat(2, minmax(0, 1fr));
}}
.inline {{
display: flex;
gap: 10px;
align-items: center;
margin-top: 12px;
}}
.inline input[type="checkbox"] {{ width: auto; }}
button {{
cursor: pointer;
font-weight: 700;
color: #fff;
background: var(--accent);
border: none;
}}
button.alt {{ background: var(--accent-2); }}
pre {{
overflow: auto;
margin: 0;
padding: 12px;
border-radius: 12px;
background: #241f1a;
color: #f7efe4;
min-height: 140px;
}}
.actions {{
display: flex;
gap: 10px;
flex-wrap: wrap;
margin-top: 12px;
}}
.actions button {{
width: auto;
min-width: 140px;
}}
@media (max-width: 640px) {{
.row {{ grid-template-columns: 1fr; }}
.actions button {{ width: 100%; }}
}}
</style>
</head>
<body>
<main>
<section class="hero">
<h1>Fichero Printer</h1>
<p class="muted">Home Assistant print console for status, text labels, and image uploads.</p>
<p class="muted">API docs remain available at <a href="docs">/docs</a>.</p>
</section>
<section class="grid">
<div class="card">
<h2>Connection</h2>
<label for="address">Printer address</label>
<div style="display:flex;gap:8px;align-items:center">
<input id="address" value="{default_address}" placeholder="C9:48:8A:69:D5:C0" style="flex:1;width:auto">
<button type="button" id="scan-btn" class="alt" style="width:auto;white-space:nowrap" onclick="scanAddress()">&#128268; Scan</button>
</div>
try:
template_path = Path(__file__).parent / "index.html"
template = template_path.read_text(encoding="utf-8")
except FileNotFoundError:
return "<h1>Error: index.html not found</h1>"
<div class="row">
<div>
<label for="transport">Transport</label>
<select id="transport">
<option value="ble"{" selected" if default_transport == "ble" else ""}>BLE</option>
<option value="classic"{" selected" if default_transport == "classic" else ""}>Classic</option>
</select>
</div>
<div>
<label for="channel">RFCOMM channel</label>
<input id="channel" type="number" min="1" max="30" value="{_DEFAULT_CHANNEL}">
</div>
</div>
<div class="actions">
<button type="button" class="alt" onclick="runGet('status')">Get Status</button>
<button type="button" class="alt" onclick="runGet('info')">Get Info</button>
</div>
</div>
<div class="card">
<h2>Output</h2>
<pre id="output">Ready.</pre>
</div>
<div class="card">
<h2>Print Text</h2>
<label for="text">Text</label>
<textarea id="text" placeholder="Hello from Home Assistant"></textarea>
<div class="row">
<div>
<label for="text_density">Density</label>
<select id="text_density">
<option value="0">0</option>
<option value="1">1</option>
<option value="2" selected>2</option>
</select>
</div>
<div>
<label for="text_copies">Copies</label>
<input id="text_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="row">
<div>
<label for="text_font_size">Font size</label>
<input id="text_font_size" type="number" min="6" max="200" value="30">
</div>
<div>
<label for="text_label_length">Label length (mm)</label>
<input id="text_label_length" type="number" min="5" max="500" value="30">
</div>
</div>
<label for="text_paper">Paper</label>
<select id="text_paper">
<option value="gap" selected>gap</option>
<option value="black">black</option>
<option value="continuous">continuous</option>
</select>
<div class="actions">
<button type="button" onclick="printText()">Print Text</button>
</div>
</div>
<div class="card">
<h2>Print Image</h2>
<label for="image_file">Image file</label>
<input id="image_file" type="file" accept="image/*">
<div class="row">
<div>
<label for="image_density">Density</label>
<select id="image_density">
<option value="0">0</option>
<option value="1">1</option>
<option value="2" selected>2</option>
</select>
</div>
<div>
<label for="image_copies">Copies</label>
<input id="image_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="row">
<div>
<label for="image_label_length">Label length (mm)</label>
<input id="image_label_length" type="number" min="5" max="500" value="30">
</div>
<div class="inline">
<input id="image_dither" type="checkbox" checked>
<label for="image_dither">Enable dithering</label>
</div>
</div>
<label for="image_paper">Paper</label>
<select id="image_paper">
<option value="gap" selected>gap</option>
<option value="black">black</option>
<option value="continuous">continuous</option>
</select>
<div class="actions">
<button type="button" onclick="printImage()">Print Image</button>
</div>
</div>
</section>
</main>
<script>
function commonParams() {{
const address = document.getElementById("address").value.trim();
const classic = document.getElementById("transport").value === "classic";
const channel = document.getElementById("channel").value;
const params = new URLSearchParams();
if (address) params.set("address", address);
params.set("classic", String(classic));
params.set("channel", channel);
return params;
}}
async function showResponse(response) {{
const output = document.getElementById("output");
let data;
try {{
data = await response.json();
}} catch {{
data = {{ detail: await response.text() }};
}}
output.textContent = JSON.stringify({{ status: response.status, ok: response.ok, data }}, null, 2);
}}
async function runGet(path) {{
const response = await fetch(`${{path}}?${{commonParams().toString()}}`);
await showResponse(response);
}}
async function scanAddress() {{
const btn = document.getElementById("scan-btn");
const output = document.getElementById("output");
btn.disabled = true;
btn.textContent = "Scanning…";
output.textContent = "Scanning for printer (up to 8 s)…";
try {{
const response = await fetch("scan");
const data = await response.json();
if (response.ok && data.address) {{
document.getElementById("address").value = data.address;
output.textContent = JSON.stringify({{ status: response.status, ok: true, data }}, null, 2);
}} else {{
output.textContent = JSON.stringify({{ status: response.status, ok: false, data }}, null, 2);
}}
}} catch (e) {{
output.textContent = "Scan failed: " + e;
}} finally {{
btn.disabled = false;
btn.innerHTML = "&#128268; Scan";
}}
}}
async function printText() {{
const form = new FormData();
form.set("text", document.getElementById("text").value);
form.set("density", document.getElementById("text_density").value);
form.set("copies", document.getElementById("text_copies").value);
form.set("font_size", document.getElementById("text_font_size").value);
form.set("label_length", document.getElementById("text_label_length").value);
form.set("paper", document.getElementById("text_paper").value);
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
const response = await fetch("print/text", {{ method: "POST", body: form }});
await showResponse(response);
}}
async function printImage() {{
const fileInput = document.getElementById("image_file");
if (!fileInput.files.length) {{
document.getElementById("output").textContent = "Select an image file first.";
return;
}}
const form = new FormData();
form.set("file", fileInput.files[0]);
form.set("density", document.getElementById("image_density").value);
form.set("copies", document.getElementById("image_copies").value);
form.set("label_length", document.getElementById("image_label_length").value);
form.set("paper", document.getElementById("image_paper").value);
form.set("dither", String(document.getElementById("image_dither").checked));
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
const response = await fetch("print/image", {{ method: "POST", body: form }});
await showResponse(response);
}}
</script>
</body>
</html>"""
# Simple substitution for initial values
return (
template.replace("{default_address}", default_address)
.replace("{ble_selected}", " selected" if default_transport == "ble" else "")
.replace("{classic_selected}", " selected" if default_transport == "classic" else "")
.replace("{default_channel}", str(_DEFAULT_CHANNEL))
)
# ---------------------------------------------------------------------------
@@ -437,22 +132,6 @@ async def docs():
)
@app.get(
"/scan",
summary="Scan for printer",
response_description="BLE address of the discovered printer",
)
async def scan_printer():
"""Scan for a Fichero/D11s printer via BLE and return its address."""
try:
address = await find_printer()
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return {"address": address}
@app.get(
"/status",
summary="Get printer status",
@@ -511,6 +190,90 @@ async def get_info(
return info
@app.post(
"/pair",
summary="Pair and trust a Bluetooth device",
status_code=200,
)
async def pair_device(
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
):
"""
Attempt to pair and trust the device using `bluetoothctl`.
This is intended for setting up Classic Bluetooth connections.
"""
addr = _address(address)
if not addr:
raise HTTPException(status_code=422, detail="Address is required to pair.")
# Basic validation for MAC address format to mitigate injection risk.
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
cmd = f'echo -e "pair {addr}\\ntrust {addr}\\nquit" | bluetoothctl'
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
except FileNotFoundError:
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
output = stdout.decode(errors="ignore")
error = stderr.decode(errors="ignore")
if "Failed to pair" in output or "not available" in output.lower():
raise HTTPException(status_code=502, detail=f"Pairing failed. Output: {output}. Error: {error}")
return {"ok": True, "message": "Pair/trust command sent. Check output for details.", "output": output, "error": error}
@app.post(
"/unpair",
summary="Unpair a Bluetooth device",
status_code=200,
)
async def unpair_device(
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
):
"""
Attempt to unpair the device using `bluetoothctl`.
"""
addr = _address(address)
if not addr:
raise HTTPException(status_code=422, detail="Address is required to unpair.")
# Basic validation for MAC address format to mitigate injection risk.
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
cmd = f'echo -e "remove {addr}\\nquit" | bluetoothctl'
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
except FileNotFoundError:
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
output = stdout.decode(errors="ignore")
error = stderr.decode(errors="ignore")
if "Failed to remove" in output or "not available" in output.lower():
raise HTTPException(status_code=502, detail=f"Unpairing failed. Output: {output}. Error: {error}")
return {"ok": True, "message": "Unpair command sent. Check output for details.", "output": output, "error": error}
@app.post(
"/print/text",
summary="Print a text label",

View File

@@ -0,0 +1,307 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Fichero Printer</title>
<style>
:root {
--bg: #f4efe6;
--panel: #fffaf2;
--line: #d8cdbd;
--ink: #2d241d;
--muted: #6c6258;
--accent: #b55e33;
--accent-2: #245b4b;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: "Noto Sans", system-ui, sans-serif;
color: var(--ink);
background:
radial-gradient(circle at top left, #fff8ed 0, transparent 35%),
linear-gradient(180deg, #efe4d3 0%, var(--bg) 100%);
}
main {
max-width: 980px;
margin: 0 auto;
padding: 24px 16px 40px;
}
.hero {
margin-bottom: 20px;
padding: 24px;
border: 1px solid var(--line);
border-radius: 18px;
background: rgba(255, 250, 242, 0.92);
backdrop-filter: blur(4px);
}
h1, h2 { margin: 0 0 12px; }
.muted { color: var(--muted); }
.grid {
display: grid;
gap: 16px;
grid-template-columns: repeat(auto-fit, minmax(280px, 1fr));
}
.card {
padding: 18px;
border: 1px solid var(--line);
border-radius: 16px;
background: var(--panel);
box-shadow: 0 8px 24px rgba(45, 36, 29, 0.06);
}
label {
display: block;
margin: 10px 0 6px;
font-size: 0.92rem;
font-weight: 600;
}
input, select, textarea, button {
width: 100%;
border-radius: 10px;
border: 1px solid var(--line);
padding: 10px 12px;
font: inherit;
}
textarea { min-height: 110px; resize: vertical; }
.row {
display: grid;
gap: 12px;
grid-template-columns: repeat(2, minmax(0, 1fr));
}
.inline {
display: flex;
gap: 10px;
align-items: center;
margin-top: 12px;
}
.inline input[type="checkbox"] { width: auto; }
button {
cursor: pointer;
font-weight: 700;
color: #fff;
background: var(--accent);
border: none;
}
button.alt { background: var(--accent-2); }
pre {
overflow: auto;
margin: 0;
padding: 12px;
border-radius: 12px;
background: #241f1a;
color: #f7efe4;
min-height: 140px;
}
.actions {
display: flex;
gap: 10px;
flex-wrap: wrap;
margin-top: 12px;
}
.actions button {
width: auto;
min-width: 140px;
}
@media (max-width: 640px) {
.row { grid-template-columns: 1fr; }
.actions button { width: 100%; }
}
</style>
</head>
<body>
<main>
<section class="hero">
<h1>Fichero Printer</h1>
<p class="muted">Home Assistant print console for status, text labels, and image uploads.</p>
<p class="muted">API docs remain available at <a href="docs">/docs</a>.</p>
</section>
<section class="grid">
<div class="card">
<h2>Connection</h2>
<label for="address">Printer address</label>
<input id="address" value="{default_address}" placeholder="C9:48:8A:69:D5:C0">
<div class="row">
<div>
<label for="transport">Transport</label>
<select id="transport">
<option value="ble"{ble_selected}>BLE</option>
<option value="classic"{classic_selected}>Classic</option>
</select>
</div>
<div>
<label for="channel">RFCOMM channel</label>
<input id="channel" type="number" min="1" max="30" value="{default_channel}">
</div>
</div>
<div class="actions">
<button type="button" class="alt" onclick="runPost('pair')">Pair Device</button>
<button type="button" class="alt" onclick="runPost('unpair')">Unpair Device</button>
<button type="button" class="alt" onclick="runGet('status')">Get Status</button>
<button type="button" class="alt" onclick="runGet('info')">Get Info</button>
</div>
</div>
<div class="card">
<h2>Output</h2>
<pre id="output">Ready.</pre>
</div>
<div class="card">
<h2>Print Text</h2>
<label for="text">Text</label>
<textarea id="text" placeholder="Hello from Home Assistant"></textarea>
<div class="row">
<div>
<label for="text_density">Density</label>
<select id="text_density">
<option value="0">0</option>
<option value="1">1</option>
<option value="2" selected>2</option>
</select>
</div>
<div>
<label for="text_copies">Copies</label>
<input id="text_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="row">
<div>
<label for="text_font_size">Font size</label>
<input id="text_font_size" type="number" min="6" max="200" value="30">
</div>
<div>
<label for="text_label_length">Label length (mm)</label>
<input id="text_label_length" type="number" min="5" max="500" value="30">
</div>
</div>
<label for="text_paper">Paper</label>
<select id="text_paper">
<option value="gap" selected>gap</option>
<option value="black">black</option>
<option value="continuous">continuous</option>
</select>
<div class="actions">
<button type="button" onclick="printText()">Print Text</button>
</div>
</div>
<div class="card">
<h2>Print Image</h2>
<label for="image_file">Image file</label>
<input id="image_file" type="file" accept="image/*">
<div class="row">
<div>
<label for="image_density">Density</label>
<select id="image_density">
<option value="0">0</option>
<option value="1">1</option>
<option value="2" selected>2</option>
</select>
</div>
<div>
<label for="image_copies">Copies</label>
<input id="image_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="row">
<div>
<label for="image_label_length">Label length (mm)</label>
<input id="image_label_length" type="number" min="5" max="500" value="30">
</div>
<div class="inline">
<input id="image_dither" type="checkbox" checked>
<label for="image_dither">Enable dithering</label>
</div>
</div>
<label for="image_paper">Paper</label>
<select id="image_paper">
<option value="gap" selected>gap</option>
<option value="black">black</option>
<option value="continuous">continuous</option>
</select>
<div class="actions">
<button type="button" onclick="printImage()">Print Image</button>
</div>
</div>
</section>
</main>
<script>
function commonParams() {
const address = document.getElementById("address").value.trim();
const classic = document.getElementById("transport").value === "classic";
const channel = document.getElementById("channel").value;
const params = new URLSearchParams();
if (address) params.set("address", address);
params.set("classic", String(classic));
params.set("channel", channel);
return params;
}
async function showResponse(response) {
const output = document.getElementById("output");
let data;
try {
data = await response.json();
} catch {
data = { detail: await response.text() };
}
output.textContent = JSON.stringify({ status: response.status, ok: response.ok, data }, null, 2);
}
async function runGet(path) {
const response = await fetch(`${path}?${commonParams().toString()}`);
await showResponse(response);
}
async function runPost(path) {
const form = new FormData();
const params = commonParams();
for (const [key, value] of params.entries()) {
form.set(key, value);
}
const response = await fetch(path, { method: "POST", body: form });
await showResponse(response);
}
async function printText() {
const form = new FormData();
form.set("text", document.getElementById("text").value);
form.set("density", document.getElementById("text_density").value);
form.set("copies", document.getElementById("text_copies").value);
form.set("font_size", document.getElementById("text_font_size").value);
form.set("label_length", document.getElementById("text_label_length").value);
form.set("paper", document.getElementById("text_paper").value);
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
const response = await fetch("print/text", { method: "POST", body: form });
await showResponse(response);
}
async function printImage() {
const fileInput = document.getElementById("image_file");
if (!fileInput.files.length) {
document.getElementById("output").textContent = "Select an image file first.";
return;
}
const form = new FormData();
form.set("file", fileInput.files[0]);
form.set("density", document.getElementById("image_density").value);
form.set("copies", document.getElementById("image_copies").value);
form.set("label_length", document.getElementById("image_label_length").value);
form.set("paper", document.getElementById("image_paper").value);
form.set("dither", String(document.getElementById("image_dither").checked));
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
const response = await fetch("print/image", { method: "POST", body: form });
await showResponse(response);
}
</script>
</body>
</html>

View File

@@ -8,6 +8,7 @@ Device class: AiYinNormalDevice (LuckPrinter SDK)
import asyncio
import sys
import errno
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
@@ -98,7 +99,19 @@ async def resolve_ble_target(address: str | None = None):
"""
if address:
device = await BleakScanner.find_device_by_address(address, timeout=8.0)
return device or address
if device is not None:
return device
# Fallback to active scan/match before giving up; do not fall back to
# raw address because BlueZ may then attempt BR/EDR and fail with
# br-connection-not-supported.
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.address and d.address.lower() == address.lower():
return d
raise PrinterNotFound(
f"BLE device {address} not found during scan. "
"Ensure printer is on, awake, and in range."
)
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
@@ -415,20 +428,47 @@ async def connect(
yield pc
return
except (PrinterError, PrinterTimeout) as exc:
# On Linux, a stale BlueZ device state can cause RFCOMM connect()
# to fail with [Errno 12] Out of memory. This is a known quirk.
# We treat this specific error as a signal to fall back to BLE.
if isinstance(exc.__cause__, OSError) and exc.__cause__.errno == errno.ENOMEM:
print(
"Classic Bluetooth connection failed with [Errno 12] Out of memory. "
"Falling back to BLE connection."
)
classic = False # Modify flag to trigger BLE path below
last_exc = exc
break
last_exc = exc
if last_exc is not None:
raise PrinterError(
f"Classic Bluetooth connection failed for '{address}'. "
f"Tried channels: {channels}. Last error: {last_exc}"
) from last_exc
raise PrinterError(f"Classic Bluetooth connection failed for '{address}'.")
else:
# If the 'classic' flag is still true, it means the loop completed without
# hitting the ENOMEM fallback case, so all classic attempts failed.
if classic:
if last_exc is not None:
raise PrinterError(
f"Classic Bluetooth connection failed for '{address}'. "
f"Tried channels: {channels}. Last error: {last_exc}"
) from last_exc
raise PrinterError(f"Classic Bluetooth connection failed for '{address}'.")
# If classic=False initially, or if it was set to False for the ENOMEM fallback:
if not classic:
target = await resolve_ble_target(address)
def _is_retryable_ble_error(exc: Exception) -> bool:
msg = str(exc).lower()
return any(token in msg for token in ("timeout", "timed out", "br-connection-timeout"))
return any(
token in msg
for token in (
"timeout",
"timed out",
"br-connection-timeout",
"failed to discover services",
"device disconnected",
)
)
last_exc: Exception | None = None
forced_rescan_done = False
for attempt in range(1, BLE_CONNECT_RETRIES + 1):
try:
async with BleakClient(target) as client:
@@ -445,8 +485,15 @@ async def connect(
except BleakDBusError as exc:
msg = str(exc).lower()
if "br-connection-not-supported" in msg:
last_exc = exc
if not forced_rescan_done:
forced_rescan_done = True
target = await resolve_ble_target(None)
if attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(
"BLE connection failed (br-connection-not-supported). "
"BLE connection failed (br-connection-not-supported) after LE rescan. "
"Try Classic Bluetooth with classic=true and channel=1."
) from exc
last_exc = exc

View File

@@ -0,0 +1,13 @@
configuration:
port:
name: "API-Port"
description: "Port des REST-API-Servers. Den obigen Port-Mapping-Eintrag entsprechend anpassen."
ble_address:
name: "Bluetooth-Adresse"
description: "Feste BLE-Adresse des Druckers (z.B. AA:BB:CC:DD:EE:FF). Leer lassen für automatischen Scan."
transport:
name: "Transport"
description: "Verbindungsart: 'ble' für Bluetooth Low Energy (Standard) oder 'classic' für RFCOMM."
channel:
name: "RFCOMM-Kanal"
description: "Classic-Bluetooth-RFCOMM-Kanal. Nur relevant wenn Transport auf 'classic' gesetzt ist."

View File

@@ -1,13 +1,13 @@
configuration:
port:
name: "API-Port"
description: "Port des REST-API-Servers. Den obigen Port-Mapping-Eintrag entsprechend anpassen."
name: "API Port"
description: "Port for the REST API server. Adjust the port mapping entry above accordingly."
ble_address:
name: "Bluetooth-Adresse"
description: "Feste BLE-Adresse des Druckers (z.B. AA:BB:CC:DD:EE:FF). Leer lassen für automatischen Scan."
name: "Bluetooth Address"
description: "Fixed BLE address of the printer (e.g., AA:BB:CC:DD:EE:FF). Leave empty for automatic scan."
transport:
name: "Transport"
description: "Verbindungsart: 'ble' für Bluetooth Low Energy (Standard) oder 'classic' für RFCOMM."
description: "Connection type: 'ble' for Bluetooth Low Energy (default) or 'classic' for RFCOMM."
channel:
name: "RFCOMM-Kanal"
description: "Classic-Bluetooth-RFCOMM-Kanal. Nur relevant wenn Transport auf 'classic' gesetzt ist."
name: "RFCOMM Channel"
description: "Classic Bluetooth RFCOMM channel. Only relevant if transport is set to 'classic'."

View File

@@ -1,28 +1,31 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project]
name = "fichero-printer"
version = "0.1.12"
description = "Fichero D11s thermal label printer - BLE CLI tool"
version = "0.1.27"
description = "Web GUI, Python CLI, and protocol documentation for the Fichero D11s thermal label printer."
readme = "README.md"
requires-python = ">=3.10"
license = {text = "MIT"}
authors = [
{name = "0xMH"},
{name = "Paul Kozber"},
]
dependencies = [
"bleak",
"numpy",
"pillow",
]
[project.optional-dependencies]
api = [
"fastapi>=0.111",
"uvicorn[standard]>=0.29",
"Pillow",
"fastapi",
"uvicorn[standard]",
"python-multipart>=0.0.9",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["fichero"]
[project.scripts]
fichero = "fichero.cli:main"
fichero-server = "fichero.api:main"
[tool.setuptools.packages.find]
where = ["fichero_printer"]
include = ["fichero*"]

0
sync_addon.sh Normal file
View File

View File

@@ -2,7 +2,7 @@
"name": "fichero-web",
"private": true,
"type": "module",
"version": "0.1.9",
"version": "0.1.13",
"scripts": {
"dev": "vite",
"build": "vite build",