419 lines
16 KiB
Python
419 lines
16 KiB
Python
"""HTTP REST API for the Fichero D11s thermal label printer.
|
||
|
||
Start with:
|
||
fichero-server [--host HOST] [--port PORT]
|
||
or:
|
||
python -m fichero.api
|
||
|
||
|
||
Endpoints:
|
||
GET /status – Printer status
|
||
GET /info – Printer info (model, firmware, battery, …)
|
||
POST /print/text – Print a text label
|
||
POST /print/image – Print an uploaded image file
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import asyncio
|
||
import io
|
||
import re
|
||
import os
|
||
from contextlib import asynccontextmanager
|
||
from pathlib import Path
|
||
from typing import Annotated
|
||
|
||
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.openapi.docs import get_swagger_ui_html
|
||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||
from PIL import Image
|
||
|
||
from fichero.cli import DOTS_PER_MM, do_print
|
||
from fichero.imaging import text_to_image
|
||
from fichero.printer import (
|
||
PAPER_GAP,
|
||
PrinterError,
|
||
PrinterNotFound,
|
||
PrinterTimeout,
|
||
connect,
|
||
)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Global connection settings (env vars or CLI flags at startup)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_DEFAULT_ADDRESS: str | None = os.environ.get("FICHERO_ADDR")
|
||
_DEFAULT_CLASSIC: bool = os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic"
|
||
_DEFAULT_CHANNEL: int = int(os.environ.get("FICHERO_CHANNEL", "1"))
|
||
|
||
_PAPER_MAP = {"gap": 0, "black": 1, "continuous": 2}
|
||
|
||
|
||
def _parse_paper(value: str) -> int:
|
||
if value in _PAPER_MAP:
|
||
return _PAPER_MAP[value]
|
||
try:
|
||
val = int(value)
|
||
if 0 <= val <= 2:
|
||
return val
|
||
except ValueError:
|
||
pass
|
||
raise HTTPException(status_code=422, detail=f"Invalid paper type '{value}'. Use gap, black, continuous or 0-2.")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# FastAPI app
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI): # noqa: ARG001
|
||
yield
|
||
|
||
|
||
app = FastAPI(
|
||
title="Fichero Printer API",
|
||
description="REST API for the Fichero D11s (AiYin) thermal label printer.",
|
||
version="0.1.28",
|
||
lifespan=lifespan,
|
||
docs_url=None,
|
||
redoc_url=None,
|
||
)
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
def _address(address: str | None) -> str | None:
|
||
"""Return the effective BLE address (request value overrides env default)."""
|
||
return address or _DEFAULT_ADDRESS
|
||
|
||
|
||
def _ui_html() -> str:
|
||
default_address = _DEFAULT_ADDRESS or ""
|
||
default_transport = "classic" if _DEFAULT_CLASSIC else "ble"
|
||
|
||
try:
|
||
template_path = Path(__file__).parent / "index.html"
|
||
template = template_path.read_text(encoding="utf-8")
|
||
except FileNotFoundError:
|
||
return "<h1>Error: index.html not found</h1>"
|
||
|
||
# Simple substitution for initial values
|
||
return (
|
||
template.replace("{default_address}", default_address)
|
||
.replace("{ble_selected}", " selected" if default_transport == "ble" else "")
|
||
.replace("{classic_selected}", " selected" if default_transport == "classic" else "")
|
||
.replace("{default_channel}", str(_DEFAULT_CHANNEL))
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Endpoints
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@app.get("/", include_in_schema=False, response_class=HTMLResponse)
|
||
async def root():
|
||
"""Serve a compact printer UI for Home Assistant."""
|
||
return HTMLResponse(_ui_html())
|
||
|
||
|
||
@app.get("/docs", include_in_schema=False)
|
||
async def docs():
|
||
"""Serve Swagger UI with ingress-safe relative OpenAPI URL."""
|
||
return get_swagger_ui_html(
|
||
openapi_url="openapi.json",
|
||
title=f"{app.title} - Swagger UI",
|
||
)
|
||
|
||
|
||
@app.get(
|
||
"/status",
|
||
summary="Get printer status",
|
||
response_description="Current printer status flags",
|
||
)
|
||
async def get_status(
|
||
address: str | None = None,
|
||
classic: bool = _DEFAULT_CLASSIC,
|
||
channel: int = _DEFAULT_CHANNEL,
|
||
):
|
||
"""Return the real-time status of the printer (paper, battery, heat, …)."""
|
||
try:
|
||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||
status = await pc.get_status()
|
||
except PrinterNotFound as exc:
|
||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||
except PrinterTimeout as exc:
|
||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||
except PrinterError as exc:
|
||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||
|
||
return {
|
||
"ok": status.ok,
|
||
"printing": status.printing,
|
||
"cover_open": status.cover_open,
|
||
"no_paper": status.no_paper,
|
||
"low_battery": status.low_battery,
|
||
"overheated": status.overheated,
|
||
"charging": status.charging,
|
||
"raw": status.raw,
|
||
}
|
||
|
||
|
||
@app.get(
|
||
"/info",
|
||
summary="Get printer info",
|
||
response_description="Model, firmware, serial number and battery level",
|
||
)
|
||
async def get_info(
|
||
address: str | None = None,
|
||
classic: bool = _DEFAULT_CLASSIC,
|
||
channel: int = _DEFAULT_CHANNEL,
|
||
):
|
||
"""Return static and dynamic printer information."""
|
||
try:
|
||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||
info = await pc.get_info()
|
||
info.update(await pc.get_all_info())
|
||
except PrinterNotFound as exc:
|
||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||
except PrinterTimeout as exc:
|
||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||
except PrinterError as exc:
|
||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||
|
||
return info
|
||
|
||
|
||
@app.post(
|
||
"/pair",
|
||
summary="Pair and trust a Bluetooth device",
|
||
status_code=200,
|
||
)
|
||
async def pair_device(
|
||
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
|
||
):
|
||
"""
|
||
Attempt to pair and trust the device using `bluetoothctl`.
|
||
This is intended for setting up Classic Bluetooth connections.
|
||
"""
|
||
addr = _address(address)
|
||
if not addr:
|
||
raise HTTPException(status_code=422, detail="Address is required to pair.")
|
||
|
||
# Basic validation for MAC address format to mitigate injection risk.
|
||
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
|
||
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
|
||
|
||
cmd = f'echo -e "pair {addr}\\ntrust {addr}\\nquit" | bluetoothctl'
|
||
|
||
try:
|
||
proc = await asyncio.create_subprocess_shell(
|
||
cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
|
||
except FileNotFoundError:
|
||
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
|
||
except asyncio.TimeoutError:
|
||
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
|
||
|
||
output = stdout.decode(errors="ignore")
|
||
error = stderr.decode(errors="ignore")
|
||
|
||
if "Failed to pair" in output or "not available" in output.lower():
|
||
raise HTTPException(status_code=502, detail=f"Pairing failed. Output: {output}. Error: {error}")
|
||
|
||
return {"ok": True, "message": "Pair/trust command sent. Check output for details.", "output": output, "error": error}
|
||
|
||
|
||
@app.post(
|
||
"/unpair",
|
||
summary="Unpair a Bluetooth device",
|
||
status_code=200,
|
||
)
|
||
async def unpair_device(
|
||
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
|
||
):
|
||
"""
|
||
Attempt to unpair the device using `bluetoothctl`.
|
||
"""
|
||
addr = _address(address)
|
||
if not addr:
|
||
raise HTTPException(status_code=422, detail="Address is required to unpair.")
|
||
|
||
# Basic validation for MAC address format to mitigate injection risk.
|
||
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
|
||
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
|
||
|
||
cmd = f'echo -e "remove {addr}\\nquit" | bluetoothctl'
|
||
|
||
try:
|
||
proc = await asyncio.create_subprocess_shell(
|
||
cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
|
||
except FileNotFoundError:
|
||
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
|
||
except asyncio.TimeoutError:
|
||
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
|
||
|
||
output = stdout.decode(errors="ignore")
|
||
error = stderr.decode(errors="ignore")
|
||
|
||
if "Failed to remove" in output or "not available" in output.lower():
|
||
raise HTTPException(status_code=502, detail=f"Unpairing failed. Output: {output}. Error: {error}")
|
||
|
||
return {"ok": True, "message": "Unpair command sent. Check output for details.", "output": output, "error": error}
|
||
|
||
@app.post(
|
||
"/print/text",
|
||
summary="Print a text label",
|
||
status_code=200,
|
||
)
|
||
async def print_text(
|
||
text: Annotated[str, Form(description="Text to print on the label")],
|
||
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
|
||
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
|
||
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
|
||
font_size: Annotated[int, Form(description="Font size in points", ge=6, le=200)] = 30,
|
||
label_length: Annotated[int | None, Form(description="Label length in mm (overrides label_height)", ge=5, le=500)] = None,
|
||
label_height: Annotated[int, Form(description="Label height in pixels", ge=40, le=4000)] = 240,
|
||
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
|
||
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
|
||
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
|
||
):
|
||
"""Print a plain-text label.
|
||
|
||
The text is rendered as a 96 px wide, 1-bit image and sent to the printer.
|
||
"""
|
||
paper_val = _parse_paper(paper)
|
||
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
|
||
|
||
img = text_to_image(text, font_size=font_size, label_height=max_rows)
|
||
|
||
try:
|
||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
|
||
dither=False, max_rows=max_rows)
|
||
except PrinterNotFound as exc:
|
||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||
except PrinterTimeout as exc:
|
||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||
except PrinterError as exc:
|
||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||
|
||
if not ok:
|
||
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
|
||
|
||
return {"ok": True, "copies": copies, "text": text}
|
||
|
||
|
||
@app.post(
|
||
"/print/image",
|
||
summary="Print an image",
|
||
status_code=200,
|
||
)
|
||
async def print_image(
|
||
file: Annotated[UploadFile, File(description="Image file to print (PNG, JPEG, BMP, …)")],
|
||
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
|
||
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
|
||
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
|
||
dither: Annotated[bool, Form(description="Apply Floyd-Steinberg dithering")] = True,
|
||
label_length: Annotated[int | None, Form(description="Max label length in mm (overrides label_height)", ge=5, le=500)] = None,
|
||
label_height: Annotated[int, Form(description="Max label height in pixels", ge=40, le=4000)] = 240,
|
||
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
|
||
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
|
||
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
|
||
):
|
||
"""Print an image file.
|
||
|
||
The image is resized to 96 px wide, optionally dithered to 1-bit, and sent to the printer.
|
||
Supported formats: PNG, JPEG, BMP, GIF, TIFF, WEBP.
|
||
"""
|
||
# Validate content type loosely — Pillow will raise on unsupported data
|
||
data = await file.read()
|
||
if not data:
|
||
raise HTTPException(status_code=422, detail="Uploaded file is empty.")
|
||
|
||
try:
|
||
img = Image.open(io.BytesIO(data))
|
||
img.load() # ensure the image is fully decoded
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=422, detail=f"Cannot decode image: {exc}") from exc
|
||
|
||
paper_val = _parse_paper(paper)
|
||
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
|
||
|
||
try:
|
||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
|
||
dither=dither, max_rows=max_rows)
|
||
except PrinterNotFound as exc:
|
||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||
except PrinterTimeout as exc:
|
||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||
except PrinterError as exc:
|
||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||
|
||
if not ok:
|
||
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
|
||
|
||
return {"ok": True, "copies": copies, "filename": file.filename}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Entry point
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
"""Start the Fichero HTTP API server."""
|
||
global _DEFAULT_ADDRESS, _DEFAULT_CLASSIC, _DEFAULT_CHANNEL
|
||
|
||
try:
|
||
import uvicorn # noqa: PLC0415
|
||
except ImportError:
|
||
print("ERROR: uvicorn is required to run the API server.")
|
||
print("Install it with: pip install 'fichero-printer[api]'")
|
||
raise SystemExit(1) from None
|
||
|
||
parser = argparse.ArgumentParser(description="Fichero Printer API Server")
|
||
parser.add_argument("--host", default="127.0.0.1", help="Bind host (default: 127.0.0.1)")
|
||
parser.add_argument("--port", type=int, default=8765, help="Bind port (default: 8765)")
|
||
parser.add_argument("--address", default=_DEFAULT_ADDRESS, metavar="BLE_ADDR",
|
||
help="Default BLE address (or set FICHERO_ADDR env var)")
|
||
parser.add_argument("--classic", action="store_true", default=_DEFAULT_CLASSIC,
|
||
help="Default to Classic Bluetooth RFCOMM")
|
||
parser.add_argument("--channel", type=int, default=_DEFAULT_CHANNEL,
|
||
help="Default RFCOMM channel (default: 1)")
|
||
parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)")
|
||
args = parser.parse_args()
|
||
|
||
# Push CLI overrides into module-level defaults so all handlers pick them up
|
||
_DEFAULT_ADDRESS = args.address
|
||
_DEFAULT_CLASSIC = args.classic
|
||
_DEFAULT_CHANNEL = args.channel
|
||
|
||
# Pass the app object directly when not reloading so that the module-level
|
||
# globals (_DEFAULT_ADDRESS etc.) set above are visible to the handlers.
|
||
# The string form "fichero.api:app" is required for --reload only, because
|
||
# uvicorn's reloader needs to re-import the module in a worker process.
|
||
uvicorn.run(
|
||
"fichero.api:app" if args.reload else app,
|
||
host=args.host,
|
||
port=args.port,
|
||
reload=args.reload,
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|