diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..e69de29 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5531baf --- /dev/null +++ b/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.12-slim + +WORKDIR /app +COPY . /app + +RUN pip install pillow bleak fastapi uvicorn python-multipart + +EXPOSE 8765 +CMD ["python", "main.py"] diff --git a/fichero/api.py b/fichero/api.py new file mode 100644 index 0000000..e621cd0 --- /dev/null +++ b/fichero/api.py @@ -0,0 +1,418 @@ +"""HTTP REST API for the Fichero D11s thermal label printer. + +Start with: + fichero-server [--host HOST] [--port PORT] +or: + python -m fichero.api + + +Endpoints: + GET /status – Printer status + GET /info – Printer info (model, firmware, battery, …) + POST /print/text – Print a text label + POST /print/image – Print an uploaded image file +""" + +from __future__ import annotations + +import argparse +import asyncio +import io +import re +import os +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Annotated + +from fastapi import FastAPI, File, Form, HTTPException, UploadFile +from fastapi.middleware.cors import CORSMiddleware +from fastapi.openapi.docs import get_swagger_ui_html +from fastapi.responses import HTMLResponse, RedirectResponse +from PIL import Image + +from fichero.cli import DOTS_PER_MM, do_print +from fichero.imaging import text_to_image +from fichero.printer import ( + PAPER_GAP, + PrinterError, + PrinterNotFound, + PrinterTimeout, + connect, +) + +# --------------------------------------------------------------------------- +# Global connection settings (env vars or CLI flags at startup) +# --------------------------------------------------------------------------- + +_DEFAULT_ADDRESS: str | None = os.environ.get("FICHERO_ADDR") +_DEFAULT_CLASSIC: bool = os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic" +_DEFAULT_CHANNEL: int = int(os.environ.get("FICHERO_CHANNEL", "1")) + +_PAPER_MAP = {"gap": 0, "black": 1, "continuous": 2} + + +def _parse_paper(value: str) -> int: + if value in _PAPER_MAP: + return _PAPER_MAP[value] + try: + val = int(value) + if 0 <= val <= 2: + return val + except ValueError: + pass + raise HTTPException(status_code=422, detail=f"Invalid paper type '{value}'. Use gap, black, continuous or 0-2.") + + +# --------------------------------------------------------------------------- +# FastAPI app +# --------------------------------------------------------------------------- + +@asynccontextmanager +async def lifespan(app: FastAPI): # noqa: ARG001 + yield + + +app = FastAPI( + title="Fichero Printer API", + description="REST API for the Fichero D11s (AiYin) thermal label printer.", + version="0.1.27", + lifespan=lifespan, + docs_url=None, + redoc_url=None, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + + +def _address(address: str | None) -> str | None: + """Return the effective BLE address (request value overrides env default).""" + return address or _DEFAULT_ADDRESS + + +def _ui_html() -> str: + default_address = _DEFAULT_ADDRESS or "" + default_transport = "classic" if _DEFAULT_CLASSIC else "ble" + + try: + template_path = Path(__file__).parent / "index.html" + template = template_path.read_text(encoding="utf-8") + except FileNotFoundError: + return "

Error: index.html not found

" + + # Simple substitution for initial values + return ( + template.replace("{default_address}", default_address) + .replace("{ble_selected}", " selected" if default_transport == "ble" else "") + .replace("{classic_selected}", " selected" if default_transport == "classic" else "") + .replace("{default_channel}", str(_DEFAULT_CHANNEL)) + ) + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + +@app.get("/", include_in_schema=False, response_class=HTMLResponse) +async def root(): + """Serve a compact printer UI for Home Assistant.""" + return HTMLResponse(_ui_html()) + + +@app.get("/docs", include_in_schema=False) +async def docs(): + """Serve Swagger UI with ingress-safe relative OpenAPI URL.""" + return get_swagger_ui_html( + openapi_url="openapi.json", + title=f"{app.title} - Swagger UI", + ) + + +@app.get( + "/status", + summary="Get printer status", + response_description="Current printer status flags", +) +async def get_status( + address: str | None = None, + classic: bool = _DEFAULT_CLASSIC, + channel: int = _DEFAULT_CHANNEL, +): + """Return the real-time status of the printer (paper, battery, heat, …).""" + try: + async with connect(_address(address), classic=classic, channel=channel) as pc: + status = await pc.get_status() + except PrinterNotFound as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + except PrinterTimeout as exc: + raise HTTPException(status_code=504, detail=str(exc)) from exc + except PrinterError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + + return { + "ok": status.ok, + "printing": status.printing, + "cover_open": status.cover_open, + "no_paper": status.no_paper, + "low_battery": status.low_battery, + "overheated": status.overheated, + "charging": status.charging, + "raw": status.raw, + } + + +@app.get( + "/info", + summary="Get printer info", + response_description="Model, firmware, serial number and battery level", +) +async def get_info( + address: str | None = None, + classic: bool = _DEFAULT_CLASSIC, + channel: int = _DEFAULT_CHANNEL, +): + """Return static and dynamic printer information.""" + try: + async with connect(_address(address), classic=classic, channel=channel) as pc: + info = await pc.get_info() + info.update(await pc.get_all_info()) + except PrinterNotFound as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + except PrinterTimeout as exc: + raise HTTPException(status_code=504, detail=str(exc)) from exc + except PrinterError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + + return info + + +@app.post( + "/pair", + summary="Pair and trust a Bluetooth device", + status_code=200, +) +async def pair_device( + address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None, +): + """ + Attempt to pair and trust the device using `bluetoothctl`. + This is intended for setting up Classic Bluetooth connections. + """ + addr = _address(address) + if not addr: + raise HTTPException(status_code=422, detail="Address is required to pair.") + + # Basic validation for MAC address format to mitigate injection risk. + if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE): + raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}") + + cmd = f'echo -e "pair {addr}\\ntrust {addr}\\nquit" | bluetoothctl' + + try: + proc = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0) + except FileNotFoundError: + raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?") + except asyncio.TimeoutError: + raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.") + + output = stdout.decode(errors="ignore") + error = stderr.decode(errors="ignore") + + if "Failed to pair" in output or "not available" in output.lower(): + raise HTTPException(status_code=502, detail=f"Pairing failed. Output: {output}. Error: {error}") + + return {"ok": True, "message": "Pair/trust command sent. Check output for details.", "output": output, "error": error} + + +@app.post( + "/unpair", + summary="Unpair a Bluetooth device", + status_code=200, +) +async def unpair_device( + address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None, +): + """ + Attempt to unpair the device using `bluetoothctl`. + """ + addr = _address(address) + if not addr: + raise HTTPException(status_code=422, detail="Address is required to unpair.") + + # Basic validation for MAC address format to mitigate injection risk. + if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE): + raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}") + + cmd = f'echo -e "remove {addr}\\nquit" | bluetoothctl' + + try: + proc = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0) + except FileNotFoundError: + raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?") + except asyncio.TimeoutError: + raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.") + + output = stdout.decode(errors="ignore") + error = stderr.decode(errors="ignore") + + if "Failed to remove" in output or "not available" in output.lower(): + raise HTTPException(status_code=502, detail=f"Unpairing failed. Output: {output}. Error: {error}") + + return {"ok": True, "message": "Unpair command sent. Check output for details.", "output": output, "error": error} + +@app.post( + "/print/text", + summary="Print a text label", + status_code=200, +) +async def print_text( + text: Annotated[str, Form(description="Text to print on the label")], + density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2, + paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap", + copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1, + font_size: Annotated[int, Form(description="Font size in points", ge=6, le=200)] = 30, + label_length: Annotated[int | None, Form(description="Label length in mm (overrides label_height)", ge=5, le=500)] = None, + label_height: Annotated[int, Form(description="Label height in pixels", ge=40, le=4000)] = 240, + address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None, + classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC, + channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL, +): + """Print a plain-text label. + + The text is rendered as a 96 px wide, 1-bit image and sent to the printer. + """ + paper_val = _parse_paper(paper) + max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height + + img = text_to_image(text, font_size=font_size, label_height=max_rows) + + try: + async with connect(_address(address), classic=classic, channel=channel) as pc: + ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies, + dither=False, max_rows=max_rows) + except PrinterNotFound as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + except PrinterTimeout as exc: + raise HTTPException(status_code=504, detail=str(exc)) from exc + except PrinterError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + + if not ok: + raise HTTPException(status_code=502, detail="Printer did not confirm completion.") + + return {"ok": True, "copies": copies, "text": text} + + +@app.post( + "/print/image", + summary="Print an image", + status_code=200, +) +async def print_image( + file: Annotated[UploadFile, File(description="Image file to print (PNG, JPEG, BMP, …)")], + density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2, + paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap", + copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1, + dither: Annotated[bool, Form(description="Apply Floyd-Steinberg dithering")] = True, + label_length: Annotated[int | None, Form(description="Max label length in mm (overrides label_height)", ge=5, le=500)] = None, + label_height: Annotated[int, Form(description="Max label height in pixels", ge=40, le=4000)] = 240, + address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None, + classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC, + channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL, +): + """Print an image file. + + The image is resized to 96 px wide, optionally dithered to 1-bit, and sent to the printer. + Supported formats: PNG, JPEG, BMP, GIF, TIFF, WEBP. + """ + # Validate content type loosely — Pillow will raise on unsupported data + data = await file.read() + if not data: + raise HTTPException(status_code=422, detail="Uploaded file is empty.") + + try: + img = Image.open(io.BytesIO(data)) + img.load() # ensure the image is fully decoded + except Exception as exc: + raise HTTPException(status_code=422, detail=f"Cannot decode image: {exc}") from exc + + paper_val = _parse_paper(paper) + max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height + + try: + async with connect(_address(address), classic=classic, channel=channel) as pc: + ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies, + dither=dither, max_rows=max_rows) + except PrinterNotFound as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + except PrinterTimeout as exc: + raise HTTPException(status_code=504, detail=str(exc)) from exc + except PrinterError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + + if not ok: + raise HTTPException(status_code=502, detail="Printer did not confirm completion.") + + return {"ok": True, "copies": copies, "filename": file.filename} + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main() -> None: + """Start the Fichero HTTP API server.""" + global _DEFAULT_ADDRESS, _DEFAULT_CLASSIC, _DEFAULT_CHANNEL + + try: + import uvicorn # noqa: PLC0415 + except ImportError: + print("ERROR: uvicorn is required to run the API server.") + print("Install it with: pip install 'fichero-printer[api]'") + raise SystemExit(1) from None + + parser = argparse.ArgumentParser(description="Fichero Printer API Server") + parser.add_argument("--host", default="127.0.0.1", help="Bind host (default: 127.0.0.1)") + parser.add_argument("--port", type=int, default=8765, help="Bind port (default: 8765)") + parser.add_argument("--address", default=_DEFAULT_ADDRESS, metavar="BLE_ADDR", + help="Default BLE address (or set FICHERO_ADDR env var)") + parser.add_argument("--classic", action="store_true", default=_DEFAULT_CLASSIC, + help="Default to Classic Bluetooth RFCOMM") + parser.add_argument("--channel", type=int, default=_DEFAULT_CHANNEL, + help="Default RFCOMM channel (default: 1)") + parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)") + args = parser.parse_args() + + # Push CLI overrides into module-level defaults so all handlers pick them up + _DEFAULT_ADDRESS = args.address + _DEFAULT_CLASSIC = args.classic + _DEFAULT_CHANNEL = args.channel + + # Pass the app object directly when not reloading so that the module-level + # globals (_DEFAULT_ADDRESS etc.) set above are visible to the handlers. + # The string form "fichero.api:app" is required for --reload only, because + # uvicorn's reloader needs to re-import the module in a worker process. + uvicorn.run( + "fichero.api:app" if args.reload else app, + host=args.host, + port=args.port, + reload=args.reload, + ) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index e012c53..04fb0ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fichero-printer" -version = "0.1.0" +version = "1.0.0" description = "Fichero D11s thermal label printer - BLE CLI tool" requires-python = ">=3.10" dependencies = [ diff --git a/repository.yaml b/repository.yaml new file mode 100644 index 0000000..3355d47 --- /dev/null +++ b/repository.yaml @@ -0,0 +1,3 @@ +name: "Fichero Printer" +url: "https://git.leuschner.dev/Tobias/Fichero" +maintainer: "Tobias Leuschner" diff --git a/sync_addon.sh b/sync_addon.sh new file mode 100644 index 0000000..e69de29