0.1.27
This commit is contained in:
418
fichero_printer/fichero/api.py
Normal file
418
fichero_printer/fichero/api.py
Normal file
@@ -0,0 +1,418 @@
|
||||
"""HTTP REST API for the Fichero D11s thermal label printer.
|
||||
|
||||
Start with:
|
||||
fichero-server [--host HOST] [--port PORT]
|
||||
or:
|
||||
python -m fichero.api
|
||||
|
||||
|
||||
Endpoints:
|
||||
GET /status – Printer status
|
||||
GET /info – Printer info (model, firmware, battery, …)
|
||||
POST /print/text – Print a text label
|
||||
POST /print/image – Print an uploaded image file
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import io
|
||||
import re
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.openapi.docs import get_swagger_ui_html
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
from PIL import Image
|
||||
|
||||
from fichero.cli import DOTS_PER_MM, do_print
|
||||
from fichero.imaging import text_to_image
|
||||
from fichero.printer import (
|
||||
PAPER_GAP,
|
||||
PrinterError,
|
||||
PrinterNotFound,
|
||||
PrinterTimeout,
|
||||
connect,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Global connection settings (env vars or CLI flags at startup)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_DEFAULT_ADDRESS: str | None = os.environ.get("FICHERO_ADDR")
|
||||
_DEFAULT_CLASSIC: bool = os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic"
|
||||
_DEFAULT_CHANNEL: int = int(os.environ.get("FICHERO_CHANNEL", "1"))
|
||||
|
||||
_PAPER_MAP = {"gap": 0, "black": 1, "continuous": 2}
|
||||
|
||||
|
||||
def _parse_paper(value: str) -> int:
|
||||
if value in _PAPER_MAP:
|
||||
return _PAPER_MAP[value]
|
||||
try:
|
||||
val = int(value)
|
||||
if 0 <= val <= 2:
|
||||
return val
|
||||
except ValueError:
|
||||
pass
|
||||
raise HTTPException(status_code=422, detail=f"Invalid paper type '{value}'. Use gap, black, continuous or 0-2.")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FastAPI app
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI): # noqa: ARG001
|
||||
yield
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="Fichero Printer API",
|
||||
description="REST API for the Fichero D11s (AiYin) thermal label printer.",
|
||||
version="0.1.26",
|
||||
lifespan=lifespan,
|
||||
docs_url=None,
|
||||
redoc_url=None,
|
||||
)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
def _address(address: str | None) -> str | None:
|
||||
"""Return the effective BLE address (request value overrides env default)."""
|
||||
return address or _DEFAULT_ADDRESS
|
||||
|
||||
|
||||
def _ui_html() -> str:
|
||||
default_address = _DEFAULT_ADDRESS or ""
|
||||
default_transport = "classic" if _DEFAULT_CLASSIC else "ble"
|
||||
|
||||
try:
|
||||
template_path = Path(__file__).parent / "index.html"
|
||||
template = template_path.read_text(encoding="utf-8")
|
||||
except FileNotFoundError:
|
||||
return "<h1>Error: index.html not found</h1>"
|
||||
|
||||
# Simple substitution for initial values
|
||||
return (
|
||||
template.replace("{default_address}", default_address)
|
||||
.replace("{ble_selected}", " selected" if default_transport == "ble" else "")
|
||||
.replace("{classic_selected}", " selected" if default_transport == "classic" else "")
|
||||
.replace("{default_channel}", str(_DEFAULT_CHANNEL))
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@app.get("/", include_in_schema=False, response_class=HTMLResponse)
|
||||
async def root():
|
||||
"""Serve a compact printer UI for Home Assistant."""
|
||||
return HTMLResponse(_ui_html())
|
||||
|
||||
|
||||
@app.get("/docs", include_in_schema=False)
|
||||
async def docs():
|
||||
"""Serve Swagger UI with ingress-safe relative OpenAPI URL."""
|
||||
return get_swagger_ui_html(
|
||||
openapi_url="openapi.json",
|
||||
title=f"{app.title} - Swagger UI",
|
||||
)
|
||||
|
||||
|
||||
@app.get(
|
||||
"/status",
|
||||
summary="Get printer status",
|
||||
response_description="Current printer status flags",
|
||||
)
|
||||
async def get_status(
|
||||
address: str | None = None,
|
||||
classic: bool = _DEFAULT_CLASSIC,
|
||||
channel: int = _DEFAULT_CHANNEL,
|
||||
):
|
||||
"""Return the real-time status of the printer (paper, battery, heat, …)."""
|
||||
try:
|
||||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||||
status = await pc.get_status()
|
||||
except PrinterNotFound as exc:
|
||||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||||
except PrinterTimeout as exc:
|
||||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||||
except PrinterError as exc:
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
|
||||
return {
|
||||
"ok": status.ok,
|
||||
"printing": status.printing,
|
||||
"cover_open": status.cover_open,
|
||||
"no_paper": status.no_paper,
|
||||
"low_battery": status.low_battery,
|
||||
"overheated": status.overheated,
|
||||
"charging": status.charging,
|
||||
"raw": status.raw,
|
||||
}
|
||||
|
||||
|
||||
@app.get(
|
||||
"/info",
|
||||
summary="Get printer info",
|
||||
response_description="Model, firmware, serial number and battery level",
|
||||
)
|
||||
async def get_info(
|
||||
address: str | None = None,
|
||||
classic: bool = _DEFAULT_CLASSIC,
|
||||
channel: int = _DEFAULT_CHANNEL,
|
||||
):
|
||||
"""Return static and dynamic printer information."""
|
||||
try:
|
||||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||||
info = await pc.get_info()
|
||||
info.update(await pc.get_all_info())
|
||||
except PrinterNotFound as exc:
|
||||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||||
except PrinterTimeout as exc:
|
||||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||||
except PrinterError as exc:
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
|
||||
return info
|
||||
|
||||
|
||||
@app.post(
|
||||
"/pair",
|
||||
summary="Pair and trust a Bluetooth device",
|
||||
status_code=200,
|
||||
)
|
||||
async def pair_device(
|
||||
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
|
||||
):
|
||||
"""
|
||||
Attempt to pair and trust the device using `bluetoothctl`.
|
||||
This is intended for setting up Classic Bluetooth connections.
|
||||
"""
|
||||
addr = _address(address)
|
||||
if not addr:
|
||||
raise HTTPException(status_code=422, detail="Address is required to pair.")
|
||||
|
||||
# Basic validation for MAC address format to mitigate injection risk.
|
||||
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
|
||||
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
|
||||
|
||||
cmd = f'echo -e "pair {addr}\\ntrust {addr}\\nquit" | bluetoothctl'
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
|
||||
except FileNotFoundError:
|
||||
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
|
||||
except asyncio.TimeoutError:
|
||||
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
|
||||
|
||||
output = stdout.decode(errors="ignore")
|
||||
error = stderr.decode(errors="ignore")
|
||||
|
||||
if "Failed to pair" in output or "not available" in output.lower():
|
||||
raise HTTPException(status_code=502, detail=f"Pairing failed. Output: {output}. Error: {error}")
|
||||
|
||||
return {"ok": True, "message": "Pair/trust command sent. Check output for details.", "output": output, "error": error}
|
||||
|
||||
|
||||
@app.post(
|
||||
"/unpair",
|
||||
summary="Unpair a Bluetooth device",
|
||||
status_code=200,
|
||||
)
|
||||
async def unpair_device(
|
||||
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
|
||||
):
|
||||
"""
|
||||
Attempt to unpair the device using `bluetoothctl`.
|
||||
"""
|
||||
addr = _address(address)
|
||||
if not addr:
|
||||
raise HTTPException(status_code=422, detail="Address is required to unpair.")
|
||||
|
||||
# Basic validation for MAC address format to mitigate injection risk.
|
||||
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
|
||||
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
|
||||
|
||||
cmd = f'echo -e "remove {addr}\\nquit" | bluetoothctl'
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
|
||||
except FileNotFoundError:
|
||||
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
|
||||
except asyncio.TimeoutError:
|
||||
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
|
||||
|
||||
output = stdout.decode(errors="ignore")
|
||||
error = stderr.decode(errors="ignore")
|
||||
|
||||
if "Failed to remove" in output or "not available" in output.lower():
|
||||
raise HTTPException(status_code=502, detail=f"Unpairing failed. Output: {output}. Error: {error}")
|
||||
|
||||
return {"ok": True, "message": "Unpair command sent. Check output for details.", "output": output, "error": error}
|
||||
|
||||
@app.post(
|
||||
"/print/text",
|
||||
summary="Print a text label",
|
||||
status_code=200,
|
||||
)
|
||||
async def print_text(
|
||||
text: Annotated[str, Form(description="Text to print on the label")],
|
||||
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
|
||||
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
|
||||
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
|
||||
font_size: Annotated[int, Form(description="Font size in points", ge=6, le=200)] = 30,
|
||||
label_length: Annotated[int | None, Form(description="Label length in mm (overrides label_height)", ge=5, le=500)] = None,
|
||||
label_height: Annotated[int, Form(description="Label height in pixels", ge=40, le=4000)] = 240,
|
||||
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
|
||||
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
|
||||
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
|
||||
):
|
||||
"""Print a plain-text label.
|
||||
|
||||
The text is rendered as a 96 px wide, 1-bit image and sent to the printer.
|
||||
"""
|
||||
paper_val = _parse_paper(paper)
|
||||
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
|
||||
|
||||
img = text_to_image(text, font_size=font_size, label_height=max_rows)
|
||||
|
||||
try:
|
||||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||||
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
|
||||
dither=False, max_rows=max_rows)
|
||||
except PrinterNotFound as exc:
|
||||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||||
except PrinterTimeout as exc:
|
||||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||||
except PrinterError as exc:
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
|
||||
if not ok:
|
||||
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
|
||||
|
||||
return {"ok": True, "copies": copies, "text": text}
|
||||
|
||||
|
||||
@app.post(
|
||||
"/print/image",
|
||||
summary="Print an image",
|
||||
status_code=200,
|
||||
)
|
||||
async def print_image(
|
||||
file: Annotated[UploadFile, File(description="Image file to print (PNG, JPEG, BMP, …)")],
|
||||
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
|
||||
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
|
||||
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
|
||||
dither: Annotated[bool, Form(description="Apply Floyd-Steinberg dithering")] = True,
|
||||
label_length: Annotated[int | None, Form(description="Max label length in mm (overrides label_height)", ge=5, le=500)] = None,
|
||||
label_height: Annotated[int, Form(description="Max label height in pixels", ge=40, le=4000)] = 240,
|
||||
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
|
||||
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
|
||||
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
|
||||
):
|
||||
"""Print an image file.
|
||||
|
||||
The image is resized to 96 px wide, optionally dithered to 1-bit, and sent to the printer.
|
||||
Supported formats: PNG, JPEG, BMP, GIF, TIFF, WEBP.
|
||||
"""
|
||||
# Validate content type loosely — Pillow will raise on unsupported data
|
||||
data = await file.read()
|
||||
if not data:
|
||||
raise HTTPException(status_code=422, detail="Uploaded file is empty.")
|
||||
|
||||
try:
|
||||
img = Image.open(io.BytesIO(data))
|
||||
img.load() # ensure the image is fully decoded
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=422, detail=f"Cannot decode image: {exc}") from exc
|
||||
|
||||
paper_val = _parse_paper(paper)
|
||||
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
|
||||
|
||||
try:
|
||||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||||
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
|
||||
dither=dither, max_rows=max_rows)
|
||||
except PrinterNotFound as exc:
|
||||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||||
except PrinterTimeout as exc:
|
||||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||||
except PrinterError as exc:
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
|
||||
if not ok:
|
||||
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
|
||||
|
||||
return {"ok": True, "copies": copies, "filename": file.filename}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main() -> None:
|
||||
"""Start the Fichero HTTP API server."""
|
||||
global _DEFAULT_ADDRESS, _DEFAULT_CLASSIC, _DEFAULT_CHANNEL
|
||||
|
||||
try:
|
||||
import uvicorn # noqa: PLC0415
|
||||
except ImportError:
|
||||
print("ERROR: uvicorn is required to run the API server.")
|
||||
print("Install it with: pip install 'fichero-printer[api]'")
|
||||
raise SystemExit(1) from None
|
||||
|
||||
parser = argparse.ArgumentParser(description="Fichero Printer API Server")
|
||||
parser.add_argument("--host", default="127.0.0.1", help="Bind host (default: 127.0.0.1)")
|
||||
parser.add_argument("--port", type=int, default=8765, help="Bind port (default: 8765)")
|
||||
parser.add_argument("--address", default=_DEFAULT_ADDRESS, metavar="BLE_ADDR",
|
||||
help="Default BLE address (or set FICHERO_ADDR env var)")
|
||||
parser.add_argument("--classic", action="store_true", default=_DEFAULT_CLASSIC,
|
||||
help="Default to Classic Bluetooth RFCOMM")
|
||||
parser.add_argument("--channel", type=int, default=_DEFAULT_CHANNEL,
|
||||
help="Default RFCOMM channel (default: 1)")
|
||||
parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Push CLI overrides into module-level defaults so all handlers pick them up
|
||||
_DEFAULT_ADDRESS = args.address
|
||||
_DEFAULT_CLASSIC = args.classic
|
||||
_DEFAULT_CHANNEL = args.channel
|
||||
|
||||
# Pass the app object directly when not reloading so that the module-level
|
||||
# globals (_DEFAULT_ADDRESS etc.) set above are visible to the handlers.
|
||||
# The string form "fichero.api:app" is required for --reload only, because
|
||||
# uvicorn's reloader needs to re-import the module in a worker process.
|
||||
uvicorn.run(
|
||||
"fichero.api:app" if args.reload else app,
|
||||
host=args.host,
|
||||
port=args.port,
|
||||
reload=args.reload,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user