Files
Fichero/fichero_printer/fichero/api.py

656 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""HTTP REST API for the Fichero D11s thermal label printer.
Start with:
fichero-server [--host HOST] [--port PORT]
or:
python -m fichero.api
Endpoints:
GET /status Printer status
GET /info Printer info (model, firmware, battery, …)
POST /print/text Print a text label
POST /print/image Print an uploaded image file
"""
from __future__ import annotations
import argparse
import asyncio
import io
import os
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.responses import HTMLResponse, RedirectResponse
from PIL import Image
from fichero.cli import DOTS_PER_MM, do_print
from fichero.imaging import text_to_image
from fichero.printer import (
PAPER_GAP,
PrinterError,
PrinterNotFound,
PrinterTimeout,
connect,
find_printer,
)
# ---------------------------------------------------------------------------
# Global connection settings (env vars or CLI flags at startup)
# ---------------------------------------------------------------------------
_DEFAULT_ADDRESS: str | None = os.environ.get("FICHERO_ADDR")
_DEFAULT_CLASSIC: bool = os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic"
_DEFAULT_CHANNEL: int = int(os.environ.get("FICHERO_CHANNEL", "1"))
_PAPER_MAP = {"gap": 0, "black": 1, "continuous": 2}
def _parse_paper(value: str) -> int:
if value in _PAPER_MAP:
return _PAPER_MAP[value]
try:
val = int(value)
if 0 <= val <= 2:
return val
except ValueError:
pass
raise HTTPException(status_code=422, detail=f"Invalid paper type '{value}'. Use gap, black, continuous or 0-2.")
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI): # noqa: ARG001
yield
app = FastAPI(
title="Fichero Printer API",
description="REST API for the Fichero D11s (AiYin) thermal label printer.",
version="0.1.9",
lifespan=lifespan,
docs_url=None,
redoc_url=None,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def _address(address: str | None) -> str | None:
"""Return the effective BLE address (request value overrides env default)."""
return address or _DEFAULT_ADDRESS
def _ui_html() -> str:
default_address = _DEFAULT_ADDRESS or ""
default_transport = "classic" if _DEFAULT_CLASSIC else "ble"
return f"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Fichero Printer</title>
<style>
:root {{
--bg: #f4efe6;
--panel: #fffaf2;
--line: #d8cdbd;
--ink: #2d241d;
--muted: #6c6258;
--accent: #b55e33;
--accent-2: #245b4b;
}}
* {{ box-sizing: border-box; }}
body {{
margin: 0;
font-family: "Noto Sans", system-ui, sans-serif;
color: var(--ink);
background:
radial-gradient(circle at top left, #fff8ed 0, transparent 35%),
linear-gradient(180deg, #efe4d3 0%, var(--bg) 100%);
}}
main {{
max-width: 980px;
margin: 0 auto;
padding: 24px 16px 40px;
}}
.hero {{
margin-bottom: 20px;
padding: 24px;
border: 1px solid var(--line);
border-radius: 18px;
background: rgba(255, 250, 242, 0.92);
backdrop-filter: blur(4px);
}}
h1, h2 {{ margin: 0 0 12px; }}
.muted {{ color: var(--muted); }}
.grid {{
display: grid;
gap: 16px;
grid-template-columns: repeat(auto-fit, minmax(280px, 1fr));
}}
.card {{
padding: 18px;
border: 1px solid var(--line);
border-radius: 16px;
background: var(--panel);
box-shadow: 0 8px 24px rgba(45, 36, 29, 0.06);
}}
label {{
display: block;
margin: 10px 0 6px;
font-size: 0.92rem;
font-weight: 600;
}}
input, select, textarea, button {{
width: 100%;
border-radius: 10px;
border: 1px solid var(--line);
padding: 10px 12px;
font: inherit;
}}
textarea {{ min-height: 110px; resize: vertical; }}
.row {{
display: grid;
gap: 12px;
grid-template-columns: repeat(2, minmax(0, 1fr));
}}
.inline {{
display: flex;
gap: 10px;
align-items: center;
margin-top: 12px;
}}
.inline input[type="checkbox"] {{ width: auto; }}
button {{
cursor: pointer;
font-weight: 700;
color: #fff;
background: var(--accent);
border: none;
}}
button.alt {{ background: var(--accent-2); }}
pre {{
overflow: auto;
margin: 0;
padding: 12px;
border-radius: 12px;
background: #241f1a;
color: #f7efe4;
min-height: 140px;
}}
.actions {{
display: flex;
gap: 10px;
flex-wrap: wrap;
margin-top: 12px;
}}
.actions button {{
width: auto;
min-width: 140px;
}}
@media (max-width: 640px) {{
.row {{ grid-template-columns: 1fr; }}
.actions button {{ width: 100%; }}
}}
</style>
</head>
<body>
<main>
<section class="hero">
<h1>Fichero Printer</h1>
<p class="muted">Home Assistant print console for status, text labels, and image uploads.</p>
<p class="muted">API docs remain available at <a href="docs">/docs</a>.</p>
</section>
<section class="grid">
<div class="card">
<h2>Connection</h2>
<label for="address">Printer address</label>
<div style="display:flex;gap:8px;align-items:center">
<input id="address" value="{default_address}" placeholder="C9:48:8A:69:D5:C0" style="flex:1;width:auto">
<button type="button" id="scan-btn" class="alt" style="width:auto;white-space:nowrap" onclick="scanAddress()">&#128268; Scan</button>
</div>
<div class="row">
<div>
<label for="transport">Transport</label>
<select id="transport">
<option value="ble"{" selected" if default_transport == "ble" else ""}>BLE</option>
<option value="classic"{" selected" if default_transport == "classic" else ""}>Classic</option>
</select>
</div>
<div>
<label for="channel">RFCOMM channel</label>
<input id="channel" type="number" min="1" max="30" value="{_DEFAULT_CHANNEL}">
</div>
</div>
<div class="actions">
<button type="button" class="alt" onclick="runGet('status')">Get Status</button>
<button type="button" class="alt" onclick="runGet('info')">Get Info</button>
</div>
</div>
<div class="card">
<h2>Output</h2>
<pre id="output">Ready.</pre>
</div>
<div class="card">
<h2>Print Text</h2>
<label for="text">Text</label>
<textarea id="text" placeholder="Hello from Home Assistant"></textarea>
<div class="row">
<div>
<label for="text_density">Density</label>
<select id="text_density">
<option value="0">0</option>
<option value="1">1</option>
<option value="2" selected>2</option>
</select>
</div>
<div>
<label for="text_copies">Copies</label>
<input id="text_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="row">
<div>
<label for="text_font_size">Font size</label>
<input id="text_font_size" type="number" min="6" max="200" value="30">
</div>
<div>
<label for="text_label_length">Label length (mm)</label>
<input id="text_label_length" type="number" min="5" max="500" value="30">
</div>
</div>
<label for="text_paper">Paper</label>
<select id="text_paper">
<option value="gap" selected>gap</option>
<option value="black">black</option>
<option value="continuous">continuous</option>
</select>
<div class="actions">
<button type="button" onclick="printText()">Print Text</button>
</div>
</div>
<div class="card">
<h2>Print Image</h2>
<label for="image_file">Image file</label>
<input id="image_file" type="file" accept="image/*">
<div class="row">
<div>
<label for="image_density">Density</label>
<select id="image_density">
<option value="0">0</option>
<option value="1">1</option>
<option value="2" selected>2</option>
</select>
</div>
<div>
<label for="image_copies">Copies</label>
<input id="image_copies" type="number" min="1" max="99" value="1">
</div>
</div>
<div class="row">
<div>
<label for="image_label_length">Label length (mm)</label>
<input id="image_label_length" type="number" min="5" max="500" value="30">
</div>
<div class="inline">
<input id="image_dither" type="checkbox" checked>
<label for="image_dither">Enable dithering</label>
</div>
</div>
<label for="image_paper">Paper</label>
<select id="image_paper">
<option value="gap" selected>gap</option>
<option value="black">black</option>
<option value="continuous">continuous</option>
</select>
<div class="actions">
<button type="button" onclick="printImage()">Print Image</button>
</div>
</div>
</section>
</main>
<script>
function commonParams() {{
const address = document.getElementById("address").value.trim();
const classic = document.getElementById("transport").value === "classic";
const channel = document.getElementById("channel").value;
const params = new URLSearchParams();
if (address) params.set("address", address);
params.set("classic", String(classic));
params.set("channel", channel);
return params;
}}
async function showResponse(response) {{
const output = document.getElementById("output");
let data;
try {{
data = await response.json();
}} catch {{
data = {{ detail: await response.text() }};
}}
output.textContent = JSON.stringify({{ status: response.status, ok: response.ok, data }}, null, 2);
}}
async function runGet(path) {{
const response = await fetch(`${{path}}?${{commonParams().toString()}}`);
await showResponse(response);
}}
async function scanAddress() {{
const btn = document.getElementById("scan-btn");
const output = document.getElementById("output");
btn.disabled = true;
btn.textContent = "Scanning…";
output.textContent = "Scanning for printer (up to 8 s)…";
try {{
const response = await fetch("scan");
const data = await response.json();
if (response.ok && data.address) {{
document.getElementById("address").value = data.address;
output.textContent = JSON.stringify({{ status: response.status, ok: true, data }}, null, 2);
}} else {{
output.textContent = JSON.stringify({{ status: response.status, ok: false, data }}, null, 2);
}}
}} catch (e) {{
output.textContent = "Scan failed: " + e;
}} finally {{
btn.disabled = false;
btn.innerHTML = "&#128268; Scan";
}}
}}
async function printText() {{
const form = new FormData();
form.set("text", document.getElementById("text").value);
form.set("density", document.getElementById("text_density").value);
form.set("copies", document.getElementById("text_copies").value);
form.set("font_size", document.getElementById("text_font_size").value);
form.set("label_length", document.getElementById("text_label_length").value);
form.set("paper", document.getElementById("text_paper").value);
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
const response = await fetch("print/text", {{ method: "POST", body: form }});
await showResponse(response);
}}
async function printImage() {{
const fileInput = document.getElementById("image_file");
if (!fileInput.files.length) {{
document.getElementById("output").textContent = "Select an image file first.";
return;
}}
const form = new FormData();
form.set("file", fileInput.files[0]);
form.set("density", document.getElementById("image_density").value);
form.set("copies", document.getElementById("image_copies").value);
form.set("label_length", document.getElementById("image_label_length").value);
form.set("paper", document.getElementById("image_paper").value);
form.set("dither", String(document.getElementById("image_dither").checked));
form.set("address", document.getElementById("address").value.trim());
form.set("classic", String(document.getElementById("transport").value === "classic"));
form.set("channel", document.getElementById("channel").value);
const response = await fetch("print/image", {{ method: "POST", body: form }});
await showResponse(response);
}}
</script>
</body>
</html>"""
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/", include_in_schema=False, response_class=HTMLResponse)
async def root():
"""Serve a compact printer UI for Home Assistant."""
return HTMLResponse(_ui_html())
@app.get("/docs", include_in_schema=False)
async def docs():
"""Serve Swagger UI with ingress-safe relative OpenAPI URL."""
return get_swagger_ui_html(
openapi_url="openapi.json",
title=f"{app.title} - Swagger UI",
)
@app.get(
"/scan",
summary="Scan for printer",
response_description="BLE address of the discovered printer",
)
async def scan_printer():
"""Scan for a Fichero/D11s printer via BLE and return its address."""
try:
address = await find_printer()
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except Exception as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return {"address": address}
@app.get(
"/status",
summary="Get printer status",
response_description="Current printer status flags",
)
async def get_status(
address: str | None = None,
classic: bool = _DEFAULT_CLASSIC,
channel: int = _DEFAULT_CHANNEL,
):
"""Return the real-time status of the printer (paper, battery, heat, …)."""
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
status = await pc.get_status()
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return {
"ok": status.ok,
"printing": status.printing,
"cover_open": status.cover_open,
"no_paper": status.no_paper,
"low_battery": status.low_battery,
"overheated": status.overheated,
"charging": status.charging,
"raw": status.raw,
}
@app.get(
"/info",
summary="Get printer info",
response_description="Model, firmware, serial number and battery level",
)
async def get_info(
address: str | None = None,
classic: bool = _DEFAULT_CLASSIC,
channel: int = _DEFAULT_CHANNEL,
):
"""Return static and dynamic printer information."""
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
info = await pc.get_info()
info.update(await pc.get_all_info())
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return info
@app.post(
"/print/text",
summary="Print a text label",
status_code=200,
)
async def print_text(
text: Annotated[str, Form(description="Text to print on the label")],
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
font_size: Annotated[int, Form(description="Font size in points", ge=6, le=200)] = 30,
label_length: Annotated[int | None, Form(description="Label length in mm (overrides label_height)", ge=5, le=500)] = None,
label_height: Annotated[int, Form(description="Label height in pixels", ge=40, le=4000)] = 240,
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
):
"""Print a plain-text label.
The text is rendered as a 96 px wide, 1-bit image and sent to the printer.
"""
paper_val = _parse_paper(paper)
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
img = text_to_image(text, font_size=font_size, label_height=max_rows)
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
dither=False, max_rows=max_rows)
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
if not ok:
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
return {"ok": True, "copies": copies, "text": text}
@app.post(
"/print/image",
summary="Print an image",
status_code=200,
)
async def print_image(
file: Annotated[UploadFile, File(description="Image file to print (PNG, JPEG, BMP, …)")],
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
dither: Annotated[bool, Form(description="Apply Floyd-Steinberg dithering")] = True,
label_length: Annotated[int | None, Form(description="Max label length in mm (overrides label_height)", ge=5, le=500)] = None,
label_height: Annotated[int, Form(description="Max label height in pixels", ge=40, le=4000)] = 240,
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
):
"""Print an image file.
The image is resized to 96 px wide, optionally dithered to 1-bit, and sent to the printer.
Supported formats: PNG, JPEG, BMP, GIF, TIFF, WEBP.
"""
# Validate content type loosely — Pillow will raise on unsupported data
data = await file.read()
if not data:
raise HTTPException(status_code=422, detail="Uploaded file is empty.")
try:
img = Image.open(io.BytesIO(data))
img.load() # ensure the image is fully decoded
except Exception as exc:
raise HTTPException(status_code=422, detail=f"Cannot decode image: {exc}") from exc
paper_val = _parse_paper(paper)
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
dither=dither, max_rows=max_rows)
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
if not ok:
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
return {"ok": True, "copies": copies, "filename": file.filename}
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
"""Start the Fichero HTTP API server."""
global _DEFAULT_ADDRESS, _DEFAULT_CLASSIC, _DEFAULT_CHANNEL
try:
import uvicorn # noqa: PLC0415
except ImportError:
print("ERROR: uvicorn is required to run the API server.")
print("Install it with: pip install 'fichero-printer[api]'")
raise SystemExit(1) from None
parser = argparse.ArgumentParser(description="Fichero Printer API Server")
parser.add_argument("--host", default="127.0.0.1", help="Bind host (default: 127.0.0.1)")
parser.add_argument("--port", type=int, default=8765, help="Bind port (default: 8765)")
parser.add_argument("--address", default=_DEFAULT_ADDRESS, metavar="BLE_ADDR",
help="Default BLE address (or set FICHERO_ADDR env var)")
parser.add_argument("--classic", action="store_true", default=_DEFAULT_CLASSIC,
help="Default to Classic Bluetooth RFCOMM")
parser.add_argument("--channel", type=int, default=_DEFAULT_CHANNEL,
help="Default RFCOMM channel (default: 1)")
parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)")
args = parser.parse_args()
# Push CLI overrides into module-level defaults so all handlers pick them up
_DEFAULT_ADDRESS = args.address
_DEFAULT_CLASSIC = args.classic
_DEFAULT_CHANNEL = args.channel
# Pass the app object directly when not reloading so that the module-level
# globals (_DEFAULT_ADDRESS etc.) set above are visible to the handlers.
# The string form "fichero.api:app" is required for --reload only, because
# uvicorn's reloader needs to re-import the module in a worker process.
uvicorn.run(
"fichero.api:app" if args.reload else app,
host=args.host,
port=args.port,
reload=args.reload,
)
if __name__ == "__main__":
main()