3 Commits

Author SHA1 Message Date
paul2212
7843a38407 refactor 2026-03-16 12:43:26 +01:00
eee58431ab update version
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
2026-03-16 09:26:06 +00:00
40a1f78f55 Merge pull request 'release/0.1.20' (#4) from release/0.1.20 into main
Some checks failed
Deploy to GitHub Pages / build (push) Has been cancelled
Deploy to GitHub Pages / deploy (push) Has been cancelled
Reviewed-on: #4
2026-03-16 09:21:35 +00:00
12 changed files with 70 additions and 1739 deletions

View File

@@ -4,6 +4,24 @@ All notable changes to this project are documented in this file.
The format is based on Keep a Changelog and this project uses Semantic Versioning. The format is based on Keep a Changelog and this project uses Semantic Versioning.
## [0.1.23] - 2026-03-08
### Changed
- Updated the Home Assistant add-on's `Dockerfile` to install the main library as a package, completing the project structure refactoring.
- Added `python-multipart` as an explicit dependency for the API server.
## [0.1.22] - 2026-03-08
### Changed
- **Refactored Project Structure**: Eliminated duplicated code by converting the project into an installable Python package. The Home Assistant add-on now installs the main library as a dependency instead of using a vendored copy, improving maintainability and preventing sync issues.
## [0.1.21] - 2026-03-08
### Fixed
- Synchronized the Home Assistant add-on's source code (`fichero_printer/fichero/`) with the main library to fix stale code and version mismatch issues.
## [0.1.20] - 2026-03-08 ## [0.1.20] - 2026-03-08
### Changed ### Changed

View File

@@ -75,7 +75,7 @@ async def lifespan(app: FastAPI): # noqa: ARG001
app = FastAPI( app = FastAPI(
title="Fichero Printer API", title="Fichero Printer API",
description="REST API for the Fichero D11s (AiYin) thermal label printer.", description="REST API for the Fichero D11s (AiYin) thermal label printer.",
version="0.1.20", version="0.1.23",
lifespan=lifespan, lifespan=lifespan,
docs_url=None, docs_url=None,
redoc_url=None, redoc_url=None,

View File

@@ -1,32 +1,27 @@
ARG BUILD_FROM ARG BUILD_FROM
FROM $BUILD_FROM FROM $BUILD_FROM
# Only dbus-dev needed to talk to the HOST BlueZ via D-Bus (host_dbus: true). # Install build tools for Python packages that need compilation (numpy, pillow)
# and dbus-dev for Bleak to communicate with the host's BlueZ via D-Bus.
# Do NOT install bluez here - we use the host BlueZ, not our own. # Do NOT install bluez here - we use the host BlueZ, not our own.
RUN apk add --no-cache \ RUN apk add --no-cache \
bash \ bash \
python3 \ python3 \
py3-pip \ py3-pip \
py3-numpy \ dbus-dev \
py3-pillow \ build-base
dbus-dev
# Pure-Python packages (bleak uses dbus-fast internally, no C compiler needed) # Copy the entire project into the container.
RUN pip3 install --no-cache-dir --break-system-packages \ # This requires the Docker build context to be the root of the repository.
"bleak>=0.21" \
"fastapi>=0.111" \
"uvicorn[standard]>=0.29" \
"python-multipart>=0.0.9"
# Copy the fichero Python package into the container
WORKDIR /app WORKDIR /app
COPY fichero/ /app/fichero/ COPY . .
# Make the package importable without installation # Install the fichero-printer package and all its dependencies from pyproject.toml.
ENV PYTHONPATH=/app # This makes the `fichero` and `fichero-server` commands available system-wide.
RUN pip3 install --no-cache-dir --break-system-packages .
# Copy startup script and normalise line endings (Windows CRLF -> LF) # Copy startup script and normalise line endings (Windows CRLF -> LF)
COPY run.sh /usr/bin/run.sh COPY fichero_printer/run.sh /usr/bin/run.sh
RUN sed -i 's/\r//' /usr/bin/run.sh && chmod +x /usr/bin/run.sh RUN sed -i 's/\r//' /usr/bin/run.sh && chmod +x /usr/bin/run.sh
CMD ["/usr/bin/run.sh"] CMD ["/usr/bin/run.sh"]

View File

@@ -1,5 +1,5 @@
name: "Fichero Printer" name: "Fichero Printer"
version: "0.1.15" version: "0.1.23"
slug: "fichero_printer" slug: "fichero_printer"
description: "REST API for the Fichero D11s (AiYin) thermal label printer over Bluetooth" description: "REST API for the Fichero D11s (AiYin) thermal label printer over Bluetooth"
url: "https://git.leuschner.dev/Tobias/Fichero" url: "https://git.leuschner.dev/Tobias/Fichero"

View File

@@ -1,25 +0,0 @@
"""Fichero D11s thermal label printer - BLE + Classic Bluetooth interface."""
from fichero.printer import (
RFCOMM_CHANNEL,
PrinterClient,
PrinterError,
PrinterNotFound,
PrinterNotReady,
PrinterStatus,
PrinterTimeout,
RFCOMMClient,
connect,
)
__all__ = [
"RFCOMM_CHANNEL",
"PrinterClient",
"PrinterError",
"PrinterNotFound",
"PrinterNotReady",
"PrinterStatus",
"PrinterTimeout",
"RFCOMMClient",
"connect",
]

View File

@@ -1,829 +0,0 @@
"""HTTP REST API for the Fichero D11s thermal label printer.
Start with:
fichero-server [--host HOST] [--port PORT]
or:
python -m fichero.api
Endpoints:
GET /status Printer status
GET /info Printer info (model, firmware, battery, …)
POST /print/text Print a text label
POST /print/image Print an uploaded image file
"""
from __future__ import annotations
import argparse
import asyncio
import io
import os
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.responses import HTMLResponse, RedirectResponse
from PIL import Image
from fichero.cli import DOTS_PER_MM, do_print
from fichero.imaging import text_to_image
from fichero.printer import (
PAPER_GAP,
PrinterError,
PrinterNotFound,
PrinterTimeout,
connect,
find_printer,
)
# ---------------------------------------------------------------------------
# Global connection settings (env vars or CLI flags at startup)
# ---------------------------------------------------------------------------
_DEFAULT_ADDRESS: str | None = os.environ.get("FICHERO_ADDR")
_DEFAULT_CLASSIC: bool = os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic"
_DEFAULT_CHANNEL: int = int(os.environ.get("FICHERO_CHANNEL", "1"))
_PAPER_MAP = {"gap": 0, "black": 1, "continuous": 2}
def _parse_paper(value: str) -> int:
if value in _PAPER_MAP:
return _PAPER_MAP[value]
try:
val = int(value)
if 0 <= val <= 2:
return val
except ValueError:
pass
raise HTTPException(status_code=422, detail=f"Invalid paper type '{value}'. Use gap, black, continuous or 0-2.")
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI): # noqa: ARG001
yield
app = FastAPI(
title="Fichero Printer API",
description="REST API for the Fichero D11s (AiYin) thermal label printer.",
version="0.1.13",
lifespan=lifespan,
docs_url=None,
redoc_url=None,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def _address(address: str | None) -> str | None:
"""Return the effective BLE address (request value overrides env default)."""
return address or _DEFAULT_ADDRESS
def _ui_html() -> str:
default_address = _DEFAULT_ADDRESS or ""
default_transport = "classic" if _DEFAULT_CLASSIC else "ble"
return f"""<!doctype html>
<html lang="en" data-theme="dark">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Fichero Printer</title>
<style>
*, *::before, *::after {{ box-sizing: border-box; margin: 0; padding: 0; }}
:root {{
--s0: #161819;
--s1: #1c1e20;
--s2: #232628;
--s3: #2a2d30;
--ink: #e4e7eb;
--muted: #949ba4;
--dim: #5d6670;
--brand: #07addf;
--brand-h: #0699c7;
--brand-glow: rgba(7,173,223,.18);
--ok: #6bb88a;
--warn: #d4a24c;
--err: #d45454;
--border: rgba(228,231,235,.08);
--border-e: rgba(228,231,235,.14);
--ctrl-bg: #141617;
--ctrl-border: rgba(228,231,235,.10);
--r: 10px;
--r-lg: 14px;
}}
html {{ height: 100%; }}
body {{
min-height: 100dvh;
font-family: "Inter", ui-sans-serif, system-ui, sans-serif;
font-size: 14px;
line-height: 1.5;
color: var(--ink);
background-color: var(--s0);
background-image:
radial-gradient(ellipse at 10% 90%, rgba(7,173,223,.07) 0%, transparent 55%),
radial-gradient(ellipse at 90% 5%, rgba(7,173,223,.04) 0%, transparent 55%);
background-attachment: fixed;
}}
/* ── Header ─────────────────────────────────────── */
header {{
position: sticky;
top: 0;
z-index: 100;
padding: 10px 20px;
display: flex;
align-items: center;
gap: 12px;
background: rgba(22,24,25,.82);
backdrop-filter: blur(16px) saturate(1.5);
-webkit-backdrop-filter: blur(16px) saturate(1.5);
border-bottom: 1px solid var(--border);
}}
.brand {{ font-size: 1.05rem; font-weight: 700; letter-spacing: -.025em; }}
.brand em {{ color: var(--brand); font-style: normal; }}
.header-right {{ margin-left: auto; display: flex; align-items: center; gap: 8px; }}
.status-pill {{
display: flex;
align-items: center;
gap: 6px;
padding: 4px 10px;
border-radius: 99px;
font-size: .78rem;
font-weight: 600;
background: var(--s2);
border: 1px solid var(--border-e);
color: var(--muted);
transition: .25s;
cursor: default;
white-space: nowrap;
}}
.status-pill .dot {{
width: 7px; height: 7px;
border-radius: 50%;
background: var(--dim);
transition: .25s;
}}
.status-pill.ok {{ color: var(--ok); border-color: rgba(107,184,138,.3); }}
.status-pill.ok .dot {{ background: var(--ok); box-shadow: 0 0 6px var(--ok); }}
.status-pill.err {{ color: var(--err); border-color: rgba(212,84,84,.3); }}
.status-pill.err .dot {{ background: var(--err); box-shadow: 0 0 6px var(--err); }}
/* ── Layout ─────────────────────────────────────── */
main {{
max-width: 1000px;
margin: 0 auto;
padding: 24px 16px 60px;
display: grid;
gap: 16px;
grid-template-columns: repeat(auto-fit, minmax(290px, 1fr));
}}
main > .full {{ grid-column: 1 / -1; }}
/* ── Cards ──────────────────────────────────────── */
.card {{
padding: 20px;
background: var(--s1);
border: 1px solid var(--border);
border-radius: var(--r-lg);
box-shadow: 0 8px 32px rgba(0,0,0,.3);
}}
.card-title {{
font-size: .7rem;
font-weight: 700;
letter-spacing: .1em;
text-transform: uppercase;
color: var(--dim);
margin-bottom: 14px;
}}
/* ── Form ───────────────────────────────────────── */
label {{
display: block;
font-size: .8rem;
font-weight: 600;
color: var(--muted);
margin: 12px 0 5px;
}}
label:first-child {{ margin-top: 0; }}
input, select, textarea {{
width: 100%;
padding: 8px 11px;
border-radius: var(--r);
border: 1px solid var(--ctrl-border);
background: var(--ctrl-bg);
color: var(--ink);
font: inherit;
transition: border-color .15s, box-shadow .15s;
outline: none;
}}
input:focus, select:focus, textarea:focus {{
border-color: var(--brand);
box-shadow: 0 0 0 3px rgba(7,173,223,.2);
}}
textarea {{ min-height: 90px; resize: vertical; }}
.two-col {{ display: grid; gap: 10px; grid-template-columns: 1fr 1fr; }}
.check-row {{ display: flex; align-items: center; gap: 8px; margin-top: 14px; }}
.check-row input[type=checkbox] {{ width: 15px; height: 15px; accent-color: var(--brand); }}
.check-row label {{ margin: 0; font-size: .85rem; color: var(--ink); }}
/* ── Buttons ────────────────────────────────────── */
.btn {{
display: inline-flex;
align-items: center;
justify-content: center;
gap: 6px;
padding: 8px 16px;
border-radius: var(--r);
border: none;
font: inherit;
font-size: .85rem;
font-weight: 600;
cursor: pointer;
transition: background .15s, box-shadow .15s, opacity .15s;
}}
.btn:disabled {{ opacity: .5; cursor: not-allowed; }}
.btn-primary {{
background: var(--brand);
color: #fff;
}}
.btn-primary:not(:disabled):hover {{
background: var(--brand-h);
box-shadow: 0 0 0 3px var(--brand-glow);
}}
.btn-secondary {{
background: var(--s3);
color: var(--ink);
border: 1px solid var(--border-e);
}}
.btn-secondary:not(:disabled):hover {{
background: var(--s3);
border-color: var(--brand);
}}
.btn-group {{ display: flex; gap: 8px; flex-wrap: wrap; margin-top: 14px; }}
.btn-group .btn {{ flex: 1; min-width: 110px; }}
/* ── Address row ────────────────────────────────── */
.addr-row {{ display: flex; gap: 8px; }}
.addr-row input {{ flex: 1; min-width: 0; }}
.addr-row .btn {{ padding: 8px 12px; white-space: nowrap; flex-shrink: 0; }}
/* ── Output / pre ───────────────────────────────── */
pre {{
background: var(--s0);
border: 1px solid var(--border);
border-radius: var(--r);
color: var(--muted);
font-family: "JetBrains Mono", "Fira Code", ui-monospace, monospace;
font-size: .78rem;
line-height: 1.6;
padding: 14px;
min-height: 130px;
overflow: auto;
white-space: pre-wrap;
word-break: break-all;
}}
pre.ok {{ color: var(--ok); border-color: rgba(107,184,138,.2); }}
pre.err {{ color: var(--err); border-color: rgba(212,84,84,.2); }}
/* ── Spinner ────────────────────────────────────── */
@keyframes spin {{ to {{ transform: rotate(360deg); }} }}
.spinner {{
display: inline-block;
width: 13px; height: 13px;
border: 2px solid rgba(255,255,255,.3);
border-top-color: #fff;
border-radius: 50%;
animation: spin .65s linear infinite;
}}
/* ── Footer ─────────────────────────────────────── */
footer {{
text-align: center;
padding: 16px;
font-size: .72rem;
color: var(--dim);
}}
footer a {{ color: var(--dim); text-decoration: none; }}
footer a:hover {{ color: var(--brand); }}
@media (max-width: 520px) {{
.two-col {{ grid-template-columns: 1fr; }}
.btn-group .btn {{ min-width: 0; }}
}}
</style>
</head>
<body>
<header>
<span class="brand">Fichero<em>Printer</em></span>
<div class="header-right">
<span class="status-pill" id="status-pill">
<span class="dot"></span>
<span id="status-text">Unknown</span>
</span>
<button class="btn btn-secondary" style="padding:6px 10px;font-size:.78rem" onclick="refreshStatus()">↻</button>
</div>
</header>
<main>
<!-- Connection card -->
<div class="card">
<p class="card-title">Connection</p>
<label for="address">Bluetooth address</label>
<div class="addr-row">
<input id="address" value="{default_address}" placeholder="AA:BB:CC:DD:EE:FF">
<button class="btn btn-secondary" id="scan-btn" onclick="scanAddress()">
<span id="scan-icon">⟳</span> Scan
</button>
</div>
<div class="two-col">
<div>
<label for="transport">Transport</label>
<select id="transport">
<option value="ble"{" selected" if default_transport == "ble" else ""}>BLE</option>
<option value="classic"{" selected" if default_transport == "classic" else ""}>Classic BT</option>
</select>
</div>
<div>
<label for="channel">RFCOMM channel</label>
<input id="channel" type="number" min="1" max="30" value="{_DEFAULT_CHANNEL}">
</div>
</div>
<div class="btn-group">
<button class="btn btn-secondary" onclick="runGet('status')">Status</button>
<button class="btn btn-secondary" onclick="runGet('info')">Device info</button>
</div>
</div>
<!-- Output card -->
<div class="card">
<p class="card-title">Response</p>
<pre id="output">Waiting for a command…</pre>
</div>
<!-- Print text card -->
<div class="card">
<p class="card-title">Print text</p>
<label for="text">Text</label>
<textarea id="text" placeholder="Hello from Home Assistant"></textarea>
<div class="two-col">
<div>
<label for="text_density">Density</label>
<select id="text_density">
<option value="0">0 Light</option>
<option value="1">1 Medium</option>
<option value="2" selected>2 Dark</option>
</select>
</div>
<div>
<label for="text_copies">Copies</label>
<input id="text_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="two-col">
<div>
<label for="text_font_size">Font size (pt)</label>
<input id="text_font_size" type="number" min="6" max="200" value="30">
</div>
<div>
<label for="text_label_length">Label length (mm)</label>
<input id="text_label_length" type="number" min="5" max="500" value="30">
</div>
</div>
<label for="text_paper">Paper type</label>
<select id="text_paper">
<option value="gap" selected>Gap / label</option>
<option value="black">Black mark</option>
<option value="continuous">Continuous</option>
</select>
<div class="btn-group">
<button class="btn btn-primary" onclick="printText()">🖨 Print text</button>
</div>
</div>
<!-- Print image card -->
<div class="card">
<p class="card-title">Print image</p>
<label for="image_file">Image file (PNG / JPEG / BMP / WEBP)</label>
<input id="image_file" type="file" accept="image/*">
<div class="two-col">
<div>
<label for="image_density">Density</label>
<select id="image_density">
<option value="0">0 Light</option>
<option value="1">1 Medium</option>
<option value="2" selected>2 Dark</option>
</select>
</div>
<div>
<label for="image_copies">Copies</label>
<input id="image_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="two-col">
<div>
<label for="image_label_length">Label length (mm)</label>
<input id="image_label_length" type="number" min="5" max="500" value="30">
</div>
<div>
<label for="image_paper">Paper type</label>
<select id="image_paper">
<option value="gap" selected>Gap / label</option>
<option value="black">Black mark</option>
<option value="continuous">Continuous</option>
</select>
</div>
</div>
<div class="check-row">
<input id="image_dither" type="checkbox" checked>
<label for="image_dither">Floyd-Steinberg dithering</label>
</div>
<div class="btn-group">
<button class="btn btn-primary" onclick="printImage()">🖨 Print image</button>
</div>
</div>
</main>
<footer>
Fichero Printer REST API &nbsp;·&nbsp;
<a href="docs">Swagger UI</a>
</footer>
<script>
// ── Helpers ──────────────────────────────────────────
function commonParams() {{
const address = document.getElementById("address").value.trim();
const classic = document.getElementById("transport").value === "classic";
const channel = document.getElementById("channel").value;
const p = new URLSearchParams();
if (address) p.set("address", address);
p.set("classic", String(classic));
p.set("channel", channel);
return p;
}}
function setOutput(obj, isOk) {{
const pre = document.getElementById("output");
pre.textContent = JSON.stringify(obj, null, 2);
pre.className = isOk ? "ok" : "err";
}}
async function showResponse(response) {{
let data;
try {{ data = await response.json(); }} catch {{ data = {{ detail: await response.text() }}; }}
setOutput({{ status: response.status, ok: response.ok, data }}, response.ok);
}}
// ── Status pill ──────────────────────────────────────
async function refreshStatus() {{
try {{
const r = await fetch("status?" + commonParams().toString());
const d = await r.json();
const pill = document.getElementById("status-pill");
const txt = document.getElementById("status-text");
if (r.ok && d.ok) {{
pill.className = "status-pill ok";
txt.textContent = d.no_paper ? "No paper" : d.charging ? "Charging" : "Ready";
}} else {{
pill.className = "status-pill err";
txt.textContent = r.ok ? "Not ready" : "Not found";
}}
}} catch {{ /* ignore */ }}
}}
// auto-refresh status on load
window.addEventListener("DOMContentLoaded", refreshStatus);
// ── Scan ─────────────────────────────────────────────
async function scanAddress() {{
const btn = document.getElementById("scan-btn");
const icon = document.getElementById("scan-icon");
btn.disabled = true;
icon.outerHTML = '<span id="scan-icon" class="spinner"></span>';
setOutput({{ message: "Scanning for printer (up to 8 s)…" }}, true);
try {{
const r = await fetch("scan");
const d = await r.json();
if (r.ok && d.address) {{
document.getElementById("address").value = d.address;
}}
setOutput({{ status: r.status, ok: r.ok, data: d }}, r.ok);
}} catch (e) {{
setOutput({{ error: String(e) }}, false);
}} finally {{
btn.disabled = false;
document.getElementById("scan-icon").outerHTML = '<span id="scan-icon">⟳</span>';
}}
}}
async function runGet(path) {{
setOutput({{ message: "Loading…" }}, true);
const r = await fetch(path + "?" + commonParams().toString());
await showResponse(r);
}}
// ── Print text ───────────────────────────────────────
async function printText() {{
const form = new FormData();
form.set("text", document.getElementById("text").value);
form.set("density", document.getElementById("text_density").value);
form.set("copies", document.getElementById("text_copies").value);
form.set("font_size", document.getElementById("text_font_size").value);
form.set("label_length", document.getElementById("text_label_length").value);
form.set("paper", document.getElementById("text_paper").value);
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
setOutput({{ message: "Sending to printer…" }}, true);
const r = await fetch("print/text", {{ method: "POST", body: form }});
await showResponse(r);
if (r.ok) refreshStatus();
}}
// ── Print image ──────────────────────────────────────
async function printImage() {{
const fi = document.getElementById("image_file");
if (!fi.files.length) {{ setOutput({{ error: "Select an image file first." }}, false); return; }}
const form = new FormData();
form.set("file", fi.files[0]);
form.set("density", document.getElementById("image_density").value);
form.set("copies", document.getElementById("image_copies").value);
form.set("label_length", document.getElementById("image_label_length").value);
form.set("paper", document.getElementById("image_paper").value);
form.set("dither", String(document.getElementById("image_dither").checked));
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
setOutput({{ message: "Sending to printer…" }}, true);
const r = await fetch("print/image", {{ method: "POST", body: form }});
await showResponse(r);
if (r.ok) refreshStatus();
}}
</script>
</body>
</html>"""
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/", include_in_schema=False, response_class=HTMLResponse)
async def root():
"""Serve a compact printer UI for Home Assistant."""
return HTMLResponse(_ui_html())
@app.get("/docs", include_in_schema=False)
async def docs():
"""Serve Swagger UI with ingress-safe relative OpenAPI URL."""
return get_swagger_ui_html(
openapi_url="openapi.json",
title=f"{app.title} - Swagger UI",
)
@app.get(
"/scan",
summary="Scan for printer",
response_description="BLE address of the discovered printer",
)
async def scan_printer():
"""Scan for a Fichero/D11s printer via BLE and return its address."""
try:
address = await find_printer()
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return {"address": address}
@app.get(
"/status",
summary="Get printer status",
response_description="Current printer status flags",
)
async def get_status(
address: str | None = None,
classic: bool = _DEFAULT_CLASSIC,
channel: int = _DEFAULT_CHANNEL,
):
"""Return the real-time status of the printer (paper, battery, heat, …)."""
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
status = await pc.get_status()
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return {
"ok": status.ok,
"printing": status.printing,
"cover_open": status.cover_open,
"no_paper": status.no_paper,
"low_battery": status.low_battery,
"overheated": status.overheated,
"charging": status.charging,
"raw": status.raw,
}
@app.get(
"/info",
summary="Get printer info",
response_description="Model, firmware, serial number and battery level",
)
async def get_info(
address: str | None = None,
classic: bool = _DEFAULT_CLASSIC,
channel: int = _DEFAULT_CHANNEL,
):
"""Return static and dynamic printer information."""
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
info = await pc.get_info()
info.update(await pc.get_all_info())
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return info
@app.post(
"/print/text",
summary="Print a text label",
status_code=200,
)
async def print_text(
text: Annotated[str, Form(description="Text to print on the label")],
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
font_size: Annotated[int, Form(description="Font size in points", ge=6, le=200)] = 30,
label_length: Annotated[int | None, Form(description="Label length in mm (overrides label_height)", ge=5, le=500)] = None,
label_height: Annotated[int, Form(description="Label height in pixels", ge=40, le=4000)] = 240,
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
):
"""Print a plain-text label.
The text is rendered as a 96 px wide, 1-bit image and sent to the printer.
"""
paper_val = _parse_paper(paper)
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
img = text_to_image(text, font_size=font_size, label_height=max_rows)
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
dither=False, max_rows=max_rows)
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
if not ok:
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
return {"ok": True, "copies": copies, "text": text}
@app.post(
"/print/image",
summary="Print an image",
status_code=200,
)
async def print_image(
file: Annotated[UploadFile, File(description="Image file to print (PNG, JPEG, BMP, …)")],
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
dither: Annotated[bool, Form(description="Apply Floyd-Steinberg dithering")] = True,
label_length: Annotated[int | None, Form(description="Max label length in mm (overrides label_height)", ge=5, le=500)] = None,
label_height: Annotated[int, Form(description="Max label height in pixels", ge=40, le=4000)] = 240,
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
):
"""Print an image file.
The image is resized to 96 px wide, optionally dithered to 1-bit, and sent to the printer.
Supported formats: PNG, JPEG, BMP, GIF, TIFF, WEBP.
"""
# Validate content type loosely — Pillow will raise on unsupported data
data = await file.read()
if not data:
raise HTTPException(status_code=422, detail="Uploaded file is empty.")
try:
img = Image.open(io.BytesIO(data))
img.load() # ensure the image is fully decoded
except Exception as exc:
raise HTTPException(status_code=422, detail=f"Cannot decode image: {exc}") from exc
paper_val = _parse_paper(paper)
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
dither=dither, max_rows=max_rows)
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
if not ok:
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
return {"ok": True, "copies": copies, "filename": file.filename}
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
"""Start the Fichero HTTP API server."""
global _DEFAULT_ADDRESS, _DEFAULT_CLASSIC, _DEFAULT_CHANNEL
try:
import uvicorn # noqa: PLC0415
except ImportError:
print("ERROR: uvicorn is required to run the API server.")
print("Install it with: pip install 'fichero-printer[api]'")
raise SystemExit(1) from None
parser = argparse.ArgumentParser(description="Fichero Printer API Server")
parser.add_argument("--host", default="127.0.0.1", help="Bind host (default: 127.0.0.1)")
parser.add_argument("--port", type=int, default=8765, help="Bind port (default: 8765)")
parser.add_argument("--address", default=_DEFAULT_ADDRESS, metavar="BLE_ADDR",
help="Default BLE address (or set FICHERO_ADDR env var)")
parser.add_argument("--classic", action="store_true", default=_DEFAULT_CLASSIC,
help="Default to Classic Bluetooth RFCOMM")
parser.add_argument("--channel", type=int, default=_DEFAULT_CHANNEL,
help="Default RFCOMM channel (default: 1)")
parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)")
args = parser.parse_args()
# Push CLI overrides into module-level defaults so all handlers pick them up
_DEFAULT_ADDRESS = args.address
_DEFAULT_CLASSIC = args.classic
_DEFAULT_CHANNEL = args.channel
# Pass the app object directly when not reloading so that the module-level
# globals (_DEFAULT_ADDRESS etc.) set above are visible to the handlers.
# The string form "fichero.api:app" is required for --reload only, because
# uvicorn's reloader needs to re-import the module in a worker process.
uvicorn.run(
"fichero.api:app" if args.reload else app,
host=args.host,
port=args.port,
reload=args.reload,
)
if __name__ == "__main__":
main()

View File

@@ -1,251 +0,0 @@
"""CLI for Fichero D11s thermal label printer."""
import argparse
import asyncio
import os
import sys
from PIL import Image
from fichero.imaging import image_to_raster, prepare_image, text_to_image
from fichero.printer import (
BYTES_PER_ROW,
DELAY_AFTER_DENSITY,
DELAY_AFTER_FEED,
DELAY_COMMAND_GAP,
DELAY_RASTER_SETTLE,
PAPER_GAP,
PrinterClient,
PrinterError,
PrinterNotReady,
connect,
)
DOTS_PER_MM = 8 # 203 DPI
def _resolve_label_height(args: argparse.Namespace) -> int:
"""Return label height in pixels from --label-length (mm) or --label-height (px)."""
if args.label_length is not None:
return args.label_length * DOTS_PER_MM
return args.label_height
async def do_print(
pc: PrinterClient,
img: Image.Image,
density: int = 1,
paper: int = PAPER_GAP,
copies: int = 1,
dither: bool = True,
max_rows: int = 240,
) -> bool:
img = prepare_image(img, max_rows=max_rows, dither=dither)
rows = img.height
raster = image_to_raster(img)
print(f" Image: {img.width}x{rows}, {len(raster)} bytes, {copies} copies")
await pc.set_density(density)
await asyncio.sleep(DELAY_AFTER_DENSITY)
for copy_num in range(copies):
if copies > 1:
print(f" Copy {copy_num + 1}/{copies}...")
# Check status before each copy (matches decompiled app behaviour)
status = await pc.get_status()
if not status.ok:
raise PrinterNotReady(f"Printer not ready: {status}")
# AiYin print sequence (from decompiled APK)
await pc.set_paper_type(paper)
await asyncio.sleep(DELAY_COMMAND_GAP)
await pc.wakeup()
await asyncio.sleep(DELAY_COMMAND_GAP)
await pc.enable()
await asyncio.sleep(DELAY_COMMAND_GAP)
# Raster image: GS v 0 m xL xH yL yH <data>
yl = rows & 0xFF
yh = (rows >> 8) & 0xFF
header = bytes([0x1D, 0x76, 0x30, 0x00, BYTES_PER_ROW, 0x00, yl, yh])
await pc.send_chunked(header + raster)
await asyncio.sleep(DELAY_RASTER_SETTLE)
await pc.form_feed()
await asyncio.sleep(DELAY_AFTER_FEED)
ok = await pc.stop_print()
if not ok:
print(" WARNING: no OK/0xAA from stop command")
return True
async def cmd_info(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
info = await pc.get_info()
for k, v in info.items():
print(f" {k}: {v}")
print()
all_info = await pc.get_all_info()
for k, v in all_info.items():
print(f" {k}: {v}")
async def cmd_status(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
status = await pc.get_status()
print(f" Status: {status}")
print(f" Raw: 0x{status.raw:02X} ({status.raw:08b})")
print(f" printing={status.printing} cover_open={status.cover_open} "
f"no_paper={status.no_paper} low_battery={status.low_battery} "
f"overheated={status.overheated} charging={status.charging}")
async def cmd_text(args: argparse.Namespace) -> None:
text = " ".join(args.text)
label_h = _resolve_label_height(args)
img = text_to_image(text, font_size=args.font_size, label_height=label_h)
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
print(f'Printing "{text}"...')
ok = await do_print(pc, img, args.density, paper=args.paper,
copies=args.copies, dither=False, max_rows=label_h)
print("Done." if ok else "FAILED.")
async def cmd_image(args: argparse.Namespace) -> None:
img = Image.open(args.path)
label_h = _resolve_label_height(args)
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
print(f"Printing {args.path}...")
ok = await do_print(pc, img, args.density, paper=args.paper,
copies=args.copies, dither=not args.no_dither,
max_rows=label_h)
print("Done." if ok else "FAILED.")
async def cmd_set(args: argparse.Namespace) -> None:
async with connect(args.address, classic=args.classic, channel=args.channel) as pc:
if args.setting == "density":
val = int(args.value)
if not 0 <= val <= 2:
print(" ERROR: density must be 0, 1, or 2")
return
ok = await pc.set_density(val)
print(f" Set density={args.value}: {'OK' if ok else 'FAILED'}")
elif args.setting == "shutdown":
val = int(args.value)
if not 1 <= val <= 480:
print(" ERROR: shutdown must be 1-480 minutes")
return
ok = await pc.set_shutdown_time(val)
print(f" Set shutdown={args.value}min: {'OK' if ok else 'FAILED'}")
elif args.setting == "paper":
types = {"gap": 0, "black": 1, "continuous": 2}
if args.value in types:
val = types[args.value]
else:
try:
val = int(args.value)
except ValueError:
print(" ERROR: paper must be gap, black, continuous, or 0-2")
return
if not 0 <= val <= 2:
print(" ERROR: paper must be gap, black, continuous, or 0-2")
return
ok = await pc.set_paper_type(val)
print(f" Set paper={args.value}: {'OK' if ok else 'FAILED'}")
def _add_paper_arg(parser: argparse.ArgumentParser) -> None:
"""Add --paper argument to a subparser."""
parser.add_argument(
"--paper", type=str, default="gap",
help="Paper type: gap (default), black, continuous",
)
def _parse_paper(value: str) -> int:
"""Convert paper string/int to protocol value."""
types = {"gap": 0, "black": 1, "continuous": 2}
if value in types:
return types[value]
try:
val = int(value)
if 0 <= val <= 2:
return val
except ValueError:
pass
print(f" WARNING: unknown paper type '{value}', using gap")
return 0
def main() -> None:
parser = argparse.ArgumentParser(description="Fichero D11s Label Printer")
parser.add_argument("--address", default=os.environ.get("FICHERO_ADDR"),
help="BLE address (skip scanning, or set FICHERO_ADDR)")
parser.add_argument("--classic", action="store_true",
default=os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic",
help="Use Classic Bluetooth (RFCOMM) instead of BLE (Linux only, "
"or set FICHERO_TRANSPORT=classic)")
parser.add_argument("--channel", type=int, default=1,
help="RFCOMM channel (default: 1, only used with --classic)")
sub = parser.add_subparsers(dest="command", required=True)
p_info = sub.add_parser("info", help="Show device info")
p_info.set_defaults(func=cmd_info)
p_status = sub.add_parser("status", help="Show detailed status")
p_status.set_defaults(func=cmd_status)
p_text = sub.add_parser("text", help="Print text label")
p_text.add_argument("text", nargs="+", help="Text to print")
p_text.add_argument("--density", type=int, default=2, choices=[0, 1, 2],
help="Print density: 0=light, 1=medium, 2=thick")
p_text.add_argument("--copies", type=int, default=1, help="Number of copies")
p_text.add_argument("--font-size", type=int, default=30, help="Font size in points")
p_text.add_argument("--label-length", type=int, default=None,
help="Label length in mm (default: 30mm)")
p_text.add_argument("--label-height", type=int, default=240,
help="Label height in pixels (default: 240, prefer --label-length)")
_add_paper_arg(p_text)
p_text.set_defaults(func=cmd_text)
p_image = sub.add_parser("image", help="Print image file")
p_image.add_argument("path", help="Path to image file")
p_image.add_argument("--density", type=int, default=2, choices=[0, 1, 2],
help="Print density: 0=light, 1=medium, 2=thick")
p_image.add_argument("--copies", type=int, default=1, help="Number of copies")
p_image.add_argument("--no-dither", action="store_true",
help="Disable Floyd-Steinberg dithering (use simple threshold)")
p_image.add_argument("--label-length", type=int, default=None,
help="Label length in mm (default: 30mm)")
p_image.add_argument("--label-height", type=int, default=240,
help="Max image height in pixels (default: 240, prefer --label-length)")
_add_paper_arg(p_image)
p_image.set_defaults(func=cmd_image)
p_set = sub.add_parser("set", help="Change printer settings")
p_set.add_argument("setting", choices=["density", "shutdown", "paper"],
help="Setting to change")
p_set.add_argument("value", help="New value")
p_set.set_defaults(func=cmd_set)
args = parser.parse_args()
# Resolve --paper string to int for print commands
if hasattr(args, "paper") and isinstance(args.paper, str):
args.paper = _parse_paper(args.paper)
try:
asyncio.run(args.func(args))
except PrinterError as e:
print(f" ERROR: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,97 +0,0 @@
"""Image processing for Fichero D11s thermal label printer."""
import logging
import numpy as np
from PIL import Image, ImageDraw, ImageFont, ImageOps
from fichero.printer import PRINTHEAD_PX
log = logging.getLogger(__name__)
def floyd_steinberg_dither(img: Image.Image) -> Image.Image:
"""Floyd-Steinberg error-diffusion dithering to 1-bit.
Same algorithm as PrinterImageProcessor.ditherFloydSteinberg() in the
decompiled Fichero APK: distributes quantisation error to neighbouring
pixels with weights 7/16, 3/16, 5/16, 1/16.
"""
arr = np.array(img, dtype=np.float32)
h, w = arr.shape
for y in range(h):
for x in range(w):
old = arr[y, x]
new = 0.0 if old < 128 else 255.0
arr[y, x] = new
err = old - new
if x + 1 < w:
arr[y, x + 1] += err * 7 / 16
if y + 1 < h:
if x - 1 >= 0:
arr[y + 1, x - 1] += err * 3 / 16
arr[y + 1, x] += err * 5 / 16
if x + 1 < w:
arr[y + 1, x + 1] += err * 1 / 16
arr = np.clip(arr, 0, 255).astype(np.uint8)
return Image.fromarray(arr, mode="L")
def prepare_image(
img: Image.Image, max_rows: int = 240, dither: bool = True
) -> Image.Image:
"""Convert any image to 96px wide, 1-bit, black on white.
When *dither* is True (default), uses Floyd-Steinberg error diffusion
for better quality on photos and gradients. Set False for crisp text.
"""
img = img.convert("L")
w, h = img.size
new_h = int(h * (PRINTHEAD_PX / w))
img = img.resize((PRINTHEAD_PX, new_h), Image.LANCZOS)
if new_h > max_rows:
log.warning("Image height %dpx exceeds max %dpx, cropping bottom", new_h, max_rows)
img = img.crop((0, 0, PRINTHEAD_PX, max_rows))
img = ImageOps.autocontrast(img, cutoff=1)
if dither:
img = floyd_steinberg_dither(img)
# Pack to 1-bit. PIL mode "1" tobytes() uses 0-bit=black, 1-bit=white,
# but the printer wants 1-bit=black. Mapping dark->1 via point() inverts
# the PIL convention so the final packed bits match what the printer needs.
img = img.point(lambda x: 1 if x < 128 else 0, "1")
return img
def image_to_raster(img: Image.Image) -> bytes:
"""Pack 1-bit image into raw raster bytes, MSB first."""
if img.mode != "1":
raise ValueError(f"Expected mode '1', got '{img.mode}'")
if img.width != PRINTHEAD_PX:
raise ValueError(f"Expected width {PRINTHEAD_PX}, got {img.width}")
return img.tobytes()
def text_to_image(text: str, font_size: int = 30, label_height: int = 240) -> Image.Image:
"""Render crisp 1-bit text, rotated 90 degrees for label printing."""
canvas_w = label_height
canvas_h = PRINTHEAD_PX
img = Image.new("L", (canvas_w, canvas_h), 255)
draw = ImageDraw.Draw(img)
draw.fontmode = "1" # disable antialiasing - pure 1-bit glyph rendering
font = ImageFont.load_default(size=font_size)
bbox = draw.textbbox((0, 0), text, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
x = (canvas_w - tw) // 2 - bbox[0]
y = (canvas_h - th) // 2 - bbox[1]
draw.text((x, y), text, fill=0, font=font)
img = img.rotate(90, expand=True)
return img

View File

@@ -1,496 +0,0 @@
"""
Fichero / D11s thermal label printer - BLE + Classic Bluetooth interface.
Protocol reverse-engineered from decompiled Fichero APK (com.lj.fichero).
Device class: AiYinNormalDevice (LuckPrinter SDK)
96px wide printhead (12 bytes/row), 203 DPI, prints 1-bit raster images.
"""
import asyncio
import sys
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from bleak import BleakClient, BleakGATTCharacteristic, BleakScanner
from bleak.exc import BleakDBusError, BleakError
# --- RFCOMM (Classic Bluetooth) support - Linux + Windows (Python 3.9+) ---
_RFCOMM_AVAILABLE = False
if sys.platform in ("linux", "win32"):
import socket as _socket
_RFCOMM_AVAILABLE = hasattr(_socket, "AF_BLUETOOTH")
RFCOMM_CHANNEL = 1
# --- BLE identifiers ---
PRINTER_NAME_PREFIXES = ("FICHERO", "D11s_")
# Using the 18f0 service (any of the four BLE UART services work)
WRITE_UUID = "00002af1-0000-1000-8000-00805f9b34fb"
NOTIFY_UUID = "00002af0-0000-1000-8000-00805f9b34fb"
# --- Printhead ---
PRINTHEAD_PX = 96
BYTES_PER_ROW = PRINTHEAD_PX // 8 # 12
CHUNK_SIZE_BLE = 200 # BLE MTU-limited
CHUNK_SIZE_CLASSIC = 16384 # from decompiled app (C1703d.java), stream-based
# --- Paper types for 10 FF 84 nn ---
PAPER_GAP = 0x00
PAPER_BLACK_MARK = 0x01
PAPER_CONTINUOUS = 0x02
# --- Timing (seconds) - empirically tuned against D11s fw 2.4.6 ---
DELAY_AFTER_DENSITY = 0.10 # printer needs time to apply density setting
DELAY_COMMAND_GAP = 0.05 # minimum gap between sequential commands
DELAY_CHUNK_GAP = 0.02 # inter-chunk pacing for BLE throughput
DELAY_RASTER_SETTLE = 0.50 # wait for printhead after raster transfer
DELAY_AFTER_FEED = 0.30 # wait after form feed before stop command
DELAY_NOTIFY_EXTRA = 0.05 # extra wait for trailing BLE notification fragments
BLE_CONNECT_RETRIES = 3 # retry transient BLE connect failures
BLE_CONNECT_BACKOFF = 0.7 # base backoff in seconds (linear: n * base)
# --- Exceptions ---
class PrinterError(Exception):
"""Base exception for printer operations."""
class PrinterNotFound(PrinterError):
"""No Fichero/D11s printer found during BLE scan."""
class PrinterTimeout(PrinterError):
"""Printer did not respond within the expected time."""
class PrinterNotReady(PrinterError):
"""Printer status indicates it cannot print."""
# --- Discovery ---
async def find_printer() -> str:
"""Scan BLE for a Fichero/D11s printer. Returns the address."""
print("Scanning for printer...")
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
print(f" Found {d.name} at {d.address}")
return d.address
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
async def resolve_ble_target(address: str | None = None):
"""Resolve a BLE target as Bleak device object when possible.
Passing a discovered device object to BleakClient helps BlueZ keep the
correct LE context for dual-mode environments.
"""
if address:
device = await BleakScanner.find_device_by_address(address, timeout=8.0)
if device is not None:
return device
# Fallback to active scan/match before giving up; do not fall back to
# raw address because BlueZ may then attempt BR/EDR and fail with
# br-connection-not-supported.
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.address and d.address.lower() == address.lower():
return d
raise PrinterNotFound(
f"BLE device {address} not found during scan. "
"Ensure printer is on, awake, and in range."
)
devices = await BleakScanner.discover(timeout=8)
for d in devices:
if d.name and any(d.name.startswith(p) for p in PRINTER_NAME_PREFIXES):
print(f" Found {d.name} at {d.address}")
return d
raise PrinterNotFound("No Fichero/D11s printer found. Is it turned on?")
# --- Status ---
class PrinterStatus:
"""Parsed status byte from 10 FF 40."""
def __init__(self, byte: int):
self.raw = byte
self.printing = bool(byte & 0x01)
self.cover_open = bool(byte & 0x02)
self.no_paper = bool(byte & 0x04)
self.low_battery = bool(byte & 0x08)
self.overheated = bool(byte & 0x10 or byte & 0x40)
self.charging = bool(byte & 0x20)
def __str__(self) -> str:
flags = []
if self.printing:
flags.append("printing")
if self.cover_open:
flags.append("cover open")
if self.no_paper:
flags.append("no paper")
if self.low_battery:
flags.append("low battery")
if self.overheated:
flags.append("overheated")
if self.charging:
flags.append("charging")
return ", ".join(flags) if flags else "ready"
@property
def ok(self) -> bool:
return not (self.cover_open or self.no_paper or self.overheated)
# --- RFCOMM client (duck-types the BleakClient interface) ---
class RFCOMMClient:
"""Classic Bluetooth (RFCOMM) transport. Linux + Windows (Python 3.9+).
Implements the same async context manager + write_gatt_char/start_notify
interface that PrinterClient expects from BleakClient. Zero dependencies
beyond stdlib.
"""
is_classic = True # transport marker for PrinterClient chunk sizing
def __init__(self, address: str, channel: int = RFCOMM_CHANNEL):
self._address = address
self._channel = channel
self._sock: "_socket.socket | None" = None
self._reader_task: asyncio.Task | None = None
async def __aenter__(self) -> "RFCOMMClient":
if not _RFCOMM_AVAILABLE:
raise PrinterError(
"RFCOMM transport requires socket.AF_BLUETOOTH "
"(Linux with BlueZ, or Windows with Python 3.9+). "
"Not available on this platform."
)
import socket as _socket
sock = _socket.socket(
_socket.AF_BLUETOOTH, _socket.SOCK_STREAM, _socket.BTPROTO_RFCOMM
)
loop = asyncio.get_running_loop()
try:
# uvloop's sock_connect path goes through getaddrinfo and doesn't
# support AF_BLUETOOTH addresses reliably. Use direct socket connect
# in a thread instead.
sock.settimeout(10.0)
await loop.run_in_executor(
None,
sock.connect,
(self._address, self._channel),
)
sock.setblocking(False)
except asyncio.TimeoutError as exc:
sock.close()
raise PrinterTimeout(
f"Classic Bluetooth connection timed out to {self._address} (channel {self._channel})."
) from exc
except OSError as exc:
sock.close()
raise PrinterError(
f"Classic Bluetooth connection failed for '{self._address}' (channel {self._channel}): {exc}"
) from exc
except Exception:
sock.close()
raise
self._sock = sock
return self
async def __aexit__(self, *exc) -> None:
if self._reader_task is not None:
self._reader_task.cancel()
try:
await self._reader_task
except asyncio.CancelledError:
pass
self._reader_task = None
if self._sock is not None:
self._sock.close()
self._sock = None
async def write_gatt_char(self, _uuid: str, data: bytes, response: bool = False) -> None:
loop = asyncio.get_running_loop()
await loop.sock_sendall(self._sock, data)
async def start_notify(self, _uuid: str, callback) -> None:
self._reader_task = asyncio.create_task(self._reader_loop(callback))
async def _reader_loop(self, callback) -> None:
loop = asyncio.get_running_loop()
while True:
try:
data = await loop.sock_recv(self._sock, 1024)
except (OSError, asyncio.CancelledError):
return
if not data:
return
callback(None, bytearray(data))
# --- Client ---
class PrinterClient:
def __init__(self, client: BleakClient):
self.client = client
self._buf = bytearray()
self._event = asyncio.Event()
self._lock = asyncio.Lock()
self._is_classic = getattr(client, "is_classic", False)
def _on_notify(self, _char: BleakGATTCharacteristic, data: bytearray) -> None:
self._buf.extend(data)
self._event.set()
async def start(self) -> None:
await self.client.start_notify(NOTIFY_UUID, self._on_notify)
async def send(self, data: bytes, wait: bool = False, timeout: float = 2.0) -> bytes:
async with self._lock:
if wait:
self._buf.clear()
self._event.clear()
await self.client.write_gatt_char(WRITE_UUID, data, response=False)
if wait:
try:
await asyncio.wait_for(self._event.wait(), timeout=timeout)
await asyncio.sleep(DELAY_NOTIFY_EXTRA)
except asyncio.TimeoutError:
raise PrinterTimeout(f"No response within {timeout}s")
return bytes(self._buf)
async def send_chunked(self, data: bytes, chunk_size: int | None = None) -> None:
if chunk_size is None:
chunk_size = CHUNK_SIZE_CLASSIC if self._is_classic else CHUNK_SIZE_BLE
delay = 0 if self._is_classic else DELAY_CHUNK_GAP
async with self._lock:
for i in range(0, len(data), chunk_size):
chunk = data[i : i + chunk_size]
await self.client.write_gatt_char(WRITE_UUID, chunk, response=False)
if delay:
await asyncio.sleep(delay)
# --- Info commands (all tested and confirmed on D11s fw 2.4.6) ---
async def get_model(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF0]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_firmware(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF1]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_serial(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xF2]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_boot_version(self) -> str:
r = await self.send(bytes([0x10, 0xFF, 0x20, 0xEF]), wait=True)
return r.decode(errors="replace").strip() if r else "?"
async def get_battery(self) -> int:
r = await self.send(bytes([0x10, 0xFF, 0x50, 0xF1]), wait=True)
if r and len(r) >= 2:
return r[-1]
return -1
async def get_status(self) -> PrinterStatus:
r = await self.send(bytes([0x10, 0xFF, 0x40]), wait=True)
if r:
return PrinterStatus(r[-1])
return PrinterStatus(0xFF)
async def get_density(self) -> bytes:
r = await self.send(bytes([0x10, 0xFF, 0x11]), wait=True)
return r
async def get_shutdown_time(self) -> int:
"""Returns auto-shutdown timeout in minutes."""
r = await self.send(bytes([0x10, 0xFF, 0x13]), wait=True)
if r and len(r) >= 2:
return (r[0] << 8) | r[1]
return -1
async def get_all_info(self) -> dict:
"""10 FF 70: returns pipe-delimited string with all device info."""
r = await self.send(bytes([0x10, 0xFF, 0x70]), wait=True)
if not r:
return {}
parts = r.decode(errors="replace").split("|")
if len(parts) >= 6:
return {
"bt_name": parts[0],
"mac_classic": parts[1],
"mac_ble": parts[2],
"firmware": parts[3],
"serial": parts[4],
"battery": f"{parts[5]}%",
}
return {"raw": r.decode(errors="replace")}
# --- Config commands (tested on D11s) ---
async def set_density(self, level: int) -> bool:
"""0=light, 1=medium, 2=thick. Returns True if printer responded OK."""
r = await self.send(bytes([0x10, 0xFF, 0x10, 0x00, level]), wait=True)
return r == b"OK"
async def set_paper_type(self, paper: int = PAPER_GAP) -> bool:
"""0=gap/label, 1=black mark, 2=continuous."""
r = await self.send(bytes([0x10, 0xFF, 0x84, paper]), wait=True)
return r == b"OK"
async def set_shutdown_time(self, minutes: int) -> bool:
hi = (minutes >> 8) & 0xFF
lo = minutes & 0xFF
r = await self.send(bytes([0x10, 0xFF, 0x12, hi, lo]), wait=True)
return r == b"OK"
async def factory_reset(self) -> bool:
r = await self.send(bytes([0x10, 0xFF, 0x04]), wait=True)
return r == b"OK"
# --- Print control (AiYin-specific, from decompiled APK) ---
async def wakeup(self) -> None:
await self.send(b"\x00" * 12)
async def enable(self) -> None:
"""AiYin enable: 10 FF FE 01 (NOT 10 FF F1 03)."""
await self.send(bytes([0x10, 0xFF, 0xFE, 0x01]))
async def feed_dots(self, dots: int) -> None:
"""Feed paper forward by n dots."""
await self.send(bytes([0x1B, 0x4A, dots & 0xFF]))
async def form_feed(self) -> None:
"""Position to next label."""
await self.send(bytes([0x1D, 0x0C]))
async def stop_print(self) -> bool:
"""AiYin stop: 10 FF FE 45. Waits for 0xAA or 'OK'."""
r = await self.send(bytes([0x10, 0xFF, 0xFE, 0x45]), wait=True, timeout=60.0)
if r:
return r[0] == 0xAA or r.startswith(b"OK")
return False
async def get_info(self) -> dict:
status = await self.get_status()
return {
"model": await self.get_model(),
"firmware": await self.get_firmware(),
"boot": await self.get_boot_version(),
"serial": await self.get_serial(),
"battery": f"{await self.get_battery()}%",
"status": str(status),
"shutdown": f"{await self.get_shutdown_time()} min",
}
@asynccontextmanager
async def connect(
address: str | None = None,
classic: bool = False,
channel: int = RFCOMM_CHANNEL,
) -> AsyncGenerator[PrinterClient, None]:
"""Discover printer, connect, and yield a ready PrinterClient."""
if classic:
if not address:
raise PrinterError("--address is required for Classic Bluetooth (no scanning)")
# D11s variants are commonly exposed on channel 1 or 3.
candidates = [channel, 1, 2, 3]
channels = [ch for i, ch in enumerate(candidates) if ch > 0 and ch not in candidates[:i]]
last_exc: Exception | None = None
for ch in channels:
try:
async with RFCOMMClient(address, ch) as client:
pc = PrinterClient(client)
await pc.start()
yield pc
return
except (PrinterError, PrinterTimeout) as exc:
last_exc = exc
if last_exc is not None:
raise PrinterError(
f"Classic Bluetooth connection failed for '{address}'. "
f"Tried channels: {channels}. Last error: {last_exc}"
) from last_exc
raise PrinterError(f"Classic Bluetooth connection failed for '{address}'.")
else:
target = await resolve_ble_target(address)
def _is_retryable_ble_error(exc: Exception) -> bool:
msg = str(exc).lower()
return any(
token in msg
for token in (
"timeout",
"timed out",
"br-connection-timeout",
"failed to discover services",
"device disconnected",
)
)
last_exc: Exception | None = None
forced_rescan_done = False
for attempt in range(1, BLE_CONNECT_RETRIES + 1):
try:
async with BleakClient(target) as client:
pc = PrinterClient(client)
await pc.start()
yield pc
return
except asyncio.TimeoutError as exc:
last_exc = exc
if attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(f"BLE connection timed out: {exc}") from exc
except BleakDBusError as exc:
msg = str(exc).lower()
if "br-connection-not-supported" in msg:
last_exc = exc
if not forced_rescan_done:
forced_rescan_done = True
target = await resolve_ble_target(None)
if attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(
"BLE connection failed (br-connection-not-supported) after LE rescan. "
"Try Classic Bluetooth with classic=true and channel=1."
) from exc
last_exc = exc
if _is_retryable_ble_error(exc) and attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(f"BLE connection failed: {exc}") from exc
except BleakError as exc:
last_exc = exc
if _is_retryable_ble_error(exc) and attempt < BLE_CONNECT_RETRIES:
await asyncio.sleep(BLE_CONNECT_BACKOFF * attempt)
continue
raise PrinterError(f"BLE error: {exc}") from exc
if last_exc is not None:
raise PrinterError(
f"BLE connection failed after {BLE_CONNECT_RETRIES} attempts: {last_exc}"
) from last_exc
raise PrinterError("BLE connection failed for unknown reason.")

View File

@@ -0,0 +1,13 @@
configuration:
port:
name: "API-Port"
description: "Port des REST-API-Servers. Den obigen Port-Mapping-Eintrag entsprechend anpassen."
ble_address:
name: "Bluetooth-Adresse"
description: "Feste BLE-Adresse des Druckers (z.B. AA:BB:CC:DD:EE:FF). Leer lassen für automatischen Scan."
transport:
name: "Transport"
description: "Verbindungsart: 'ble' für Bluetooth Low Energy (Standard) oder 'classic' für RFCOMM."
channel:
name: "RFCOMM-Kanal"
description: "Classic-Bluetooth-RFCOMM-Kanal. Nur relevant wenn Transport auf 'classic' gesetzt ist."

View File

@@ -1,13 +1,13 @@
configuration: configuration:
port: port:
name: "API-Port" name: "API Port"
description: "Port des REST-API-Servers. Den obigen Port-Mapping-Eintrag entsprechend anpassen." description: "Port for the REST API server. Adjust the port mapping entry above accordingly."
ble_address: ble_address:
name: "Bluetooth-Adresse" name: "Bluetooth Address"
description: "Feste BLE-Adresse des Druckers (z.B. AA:BB:CC:DD:EE:FF). Leer lassen für automatischen Scan." description: "Fixed BLE address of the printer (e.g., AA:BB:CC:DD:EE:FF). Leave empty for automatic scan."
transport: transport:
name: "Transport" name: "Transport"
description: "Verbindungsart: 'ble' für Bluetooth Low Energy (Standard) oder 'classic' für RFCOMM." description: "Connection type: 'ble' for Bluetooth Low Energy (default) or 'classic' for RFCOMM."
channel: channel:
name: "RFCOMM-Kanal" name: "RFCOMM Channel"
description: "Classic-Bluetooth-RFCOMM-Kanal. Nur relevant wenn Transport auf 'classic' gesetzt ist." description: "Classic Bluetooth RFCOMM channel. Only relevant if transport is set to 'classic'."

View File

@@ -1,28 +1,31 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[project] [project]
name = "fichero-printer" name = "fichero-printer"
version = "0.1.15" version = "0.1.23"
description = "Fichero D11s thermal label printer - BLE CLI tool" description = "Web GUI, Python CLI, and protocol documentation for the Fichero D11s thermal label printer."
readme = "README.md"
requires-python = ">=3.10" requires-python = ">=3.10"
license = {text = "MIT"}
authors = [
{name = "0xMH"},
{name = "Paul Kozber"},
]
dependencies = [ dependencies = [
"bleak", "bleak",
"numpy", "numpy",
"pillow", "Pillow",
] "fastapi",
"uvicorn[standard]",
[project.optional-dependencies]
api = [
"fastapi>=0.111",
"uvicorn[standard]>=0.29",
"python-multipart>=0.0.9", "python-multipart>=0.0.9",
] ]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["fichero"]
[project.scripts] [project.scripts]
fichero = "fichero.cli:main" fichero = "fichero.cli:main"
fichero-server = "fichero.api:main" fichero-server = "fichero.api:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["fichero*"]