"""HTTP REST API for the Fichero D11s thermal label printer. Start with: fichero-server [--host HOST] [--port PORT] or: python -m fichero.api Endpoints: GET /status – Printer status GET /info – Printer info (model, firmware, battery, …) POST /print/text – Print a text label POST /print/image – Print an uploaded image file """ from __future__ import annotations import argparse import asyncio import io import re import os from contextlib import asynccontextmanager from pathlib import Path from typing import Annotated from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.openapi.docs import get_swagger_ui_html from fastapi.responses import HTMLResponse, RedirectResponse from PIL import Image from fichero.cli import DOTS_PER_MM, do_print from fichero.imaging import text_to_image from fichero.printer import ( PAPER_GAP, PrinterError, PrinterNotFound, PrinterTimeout, connect, ) # --------------------------------------------------------------------------- # Global connection settings (env vars or CLI flags at startup) # --------------------------------------------------------------------------- _DEFAULT_ADDRESS: str | None = os.environ.get("FICHERO_ADDR") _DEFAULT_CLASSIC: bool = os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic" _DEFAULT_CHANNEL: int = int(os.environ.get("FICHERO_CHANNEL", "1")) _PAPER_MAP = {"gap": 0, "black": 1, "continuous": 2} def _parse_paper(value: str) -> int: if value in _PAPER_MAP: return _PAPER_MAP[value] try: val = int(value) if 0 <= val <= 2: return val except ValueError: pass raise HTTPException(status_code=422, detail=f"Invalid paper type '{value}'. Use gap, black, continuous or 0-2.") # --------------------------------------------------------------------------- # FastAPI app # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app: FastAPI): # noqa: ARG001 yield app = FastAPI( title="Fichero Printer API", description="REST API for the Fichero D11s (AiYin) thermal label printer.", version = "0.1.29", lifespan=lifespan, docs_url=None, redoc_url=None, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) def _address(address: str | None) -> str | None: """Return the effective BLE address (request value overrides env default).""" return address or _DEFAULT_ADDRESS def _ui_html() -> str: default_address = _DEFAULT_ADDRESS or "" default_transport = "classic" if _DEFAULT_CLASSIC else "ble" try: template_path = Path(__file__).parent / "index.html" template = template_path.read_text(encoding="utf-8") except FileNotFoundError: return "

Error: index.html not found

" # Simple substitution for initial values return ( template.replace("{default_address}", default_address) .replace("{ble_selected}", " selected" if default_transport == "ble" else "") .replace("{classic_selected}", " selected" if default_transport == "classic" else "") .replace("{default_channel}", str(_DEFAULT_CHANNEL)) ) # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @app.get("/", include_in_schema=False, response_class=HTMLResponse) async def root(): """Serve a compact printer UI for Home Assistant.""" return HTMLResponse(_ui_html()) @app.get("/docs", include_in_schema=False) async def docs(): """Serve Swagger UI with ingress-safe relative OpenAPI URL.""" return get_swagger_ui_html( openapi_url="openapi.json", title=f"{app.title} - Swagger UI", ) @app.get( "/status", summary="Get printer status", response_description="Current printer status flags", ) async def get_status( address: str | None = None, classic: bool = _DEFAULT_CLASSIC, channel: int = _DEFAULT_CHANNEL, ): """Return the real-time status of the printer (paper, battery, heat, …).""" try: async with connect(_address(address), classic=classic, channel=channel) as pc: status = await pc.get_status() except PrinterNotFound as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc except PrinterTimeout as exc: raise HTTPException(status_code=504, detail=str(exc)) from exc except PrinterError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc return { "ok": status.ok, "printing": status.printing, "cover_open": status.cover_open, "no_paper": status.no_paper, "low_battery": status.low_battery, "overheated": status.overheated, "charging": status.charging, "raw": status.raw, } @app.get( "/info", summary="Get printer info", response_description="Model, firmware, serial number and battery level", ) async def get_info( address: str | None = None, classic: bool = _DEFAULT_CLASSIC, channel: int = _DEFAULT_CHANNEL, ): """Return static and dynamic printer information.""" try: async with connect(_address(address), classic=classic, channel=channel) as pc: info = await pc.get_info() info.update(await pc.get_all_info()) except PrinterNotFound as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc except PrinterTimeout as exc: raise HTTPException(status_code=504, detail=str(exc)) from exc except PrinterError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc return info @app.post( "/pair", summary="Pair and trust a Bluetooth device", status_code=200, ) async def pair_device( address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None, ): """ Attempt to pair and trust the device using `bluetoothctl`. This is intended for setting up Classic Bluetooth connections. """ addr = _address(address) if not addr: raise HTTPException(status_code=422, detail="Address is required to pair.") # Basic validation for MAC address format to mitigate injection risk. if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE): raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}") cmd = f'echo -e "pair {addr}\\ntrust {addr}\\nquit" | bluetoothctl' try: proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0) except FileNotFoundError: raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?") except asyncio.TimeoutError: raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.") output = stdout.decode(errors="ignore") error = stderr.decode(errors="ignore") if "Failed to pair" in output or "not available" in output.lower(): raise HTTPException(status_code=502, detail=f"Pairing failed. Output: {output}. Error: {error}") return {"ok": True, "message": "Pair/trust command sent. Check output for details.", "output": output, "error": error} @app.post( "/unpair", summary="Unpair a Bluetooth device", status_code=200, ) async def unpair_device( address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None, ): """ Attempt to unpair the device using `bluetoothctl`. """ addr = _address(address) if not addr: raise HTTPException(status_code=422, detail="Address is required to unpair.") # Basic validation for MAC address format to mitigate injection risk. if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE): raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}") cmd = f'echo -e "remove {addr}\\nquit" | bluetoothctl' try: proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0) except FileNotFoundError: raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?") except asyncio.TimeoutError: raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.") output = stdout.decode(errors="ignore") error = stderr.decode(errors="ignore") if "Failed to remove" in output or "not available" in output.lower(): raise HTTPException(status_code=502, detail=f"Unpairing failed. Output: {output}. Error: {error}") return {"ok": True, "message": "Unpair command sent. Check output for details.", "output": output, "error": error} @app.post( "/print/text", summary="Print a text label", status_code=200, ) async def print_text( text: Annotated[str, Form(description="Text to print on the label")], density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2, paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap", copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1, font_size: Annotated[int, Form(description="Font size in points", ge=6, le=200)] = 30, label_length: Annotated[int | None, Form(description="Label length in mm (overrides label_height)", ge=5, le=500)] = None, label_height: Annotated[int, Form(description="Label height in pixels", ge=40, le=4000)] = 240, address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None, classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC, channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL, ): """Print a plain-text label. The text is rendered as a 96 px wide, 1-bit image and sent to the printer. """ paper_val = _parse_paper(paper) max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height img = text_to_image(text, font_size=font_size, label_height=max_rows) try: async with connect(_address(address), classic=classic, channel=channel) as pc: ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies, dither=False, max_rows=max_rows) except PrinterNotFound as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc except PrinterTimeout as exc: raise HTTPException(status_code=504, detail=str(exc)) from exc except PrinterError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc if not ok: raise HTTPException(status_code=502, detail="Printer did not confirm completion.") return {"ok": True, "copies": copies, "text": text} @app.post( "/print/image", summary="Print an image", status_code=200, ) async def print_image( file: Annotated[UploadFile, File(description="Image file to print (PNG, JPEG, BMP, …)")], density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2, paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap", copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1, dither: Annotated[bool, Form(description="Apply Floyd-Steinberg dithering")] = True, label_length: Annotated[int | None, Form(description="Max label length in mm (overrides label_height)", ge=5, le=500)] = None, label_height: Annotated[int, Form(description="Max label height in pixels", ge=40, le=4000)] = 240, address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None, classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC, channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL, ): """Print an image file. The image is resized to 96 px wide, optionally dithered to 1-bit, and sent to the printer. Supported formats: PNG, JPEG, BMP, GIF, TIFF, WEBP. """ # Validate content type loosely — Pillow will raise on unsupported data data = await file.read() if not data: raise HTTPException(status_code=422, detail="Uploaded file is empty.") try: img = Image.open(io.BytesIO(data)) img.load() # ensure the image is fully decoded except Exception as exc: raise HTTPException(status_code=422, detail=f"Cannot decode image: {exc}") from exc paper_val = _parse_paper(paper) max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height try: async with connect(_address(address), classic=classic, channel=channel) as pc: ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies, dither=dither, max_rows=max_rows) except PrinterNotFound as exc: raise HTTPException(status_code=404, detail=str(exc)) from exc except PrinterTimeout as exc: raise HTTPException(status_code=504, detail=str(exc)) from exc except PrinterError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc if not ok: raise HTTPException(status_code=502, detail="Printer did not confirm completion.") return {"ok": True, "copies": copies, "filename": file.filename} # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main() -> None: """Start the Fichero HTTP API server.""" global _DEFAULT_ADDRESS, _DEFAULT_CLASSIC, _DEFAULT_CHANNEL try: import uvicorn # noqa: PLC0415 except ImportError: print("ERROR: uvicorn is required to run the API server.") print("Install it with: pip install 'fichero-printer[api]'") raise SystemExit(1) from None parser = argparse.ArgumentParser(description="Fichero Printer API Server") parser.add_argument("--host", default="127.0.0.1", help="Bind host (default: 127.0.0.1)") parser.add_argument("--port", type=int, default=8765, help="Bind port (default: 8765)") parser.add_argument("--address", default=_DEFAULT_ADDRESS, metavar="BLE_ADDR", help="Default BLE address (or set FICHERO_ADDR env var)") parser.add_argument("--classic", action="store_true", default=_DEFAULT_CLASSIC, help="Default to Classic Bluetooth RFCOMM") parser.add_argument("--channel", type=int, default=_DEFAULT_CHANNEL, help="Default RFCOMM channel (default: 1)") parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)") args = parser.parse_args() # Push CLI overrides into module-level defaults so all handlers pick them up _DEFAULT_ADDRESS = args.address _DEFAULT_CLASSIC = args.classic _DEFAULT_CHANNEL = args.channel # Pass the app object directly when not reloading so that the module-level # globals (_DEFAULT_ADDRESS etc.) set above are visible to the handlers. # The string form "fichero.api:app" is required for --reload only, because # uvicorn's reloader needs to re-import the module in a worker process. uvicorn.run( "fichero.api:app" if args.reload else app, host=args.host, port=args.port, reload=args.reload, ) if __name__ == "__main__": main()