- Implemented a new module for the Fichero D11s thermal label printer, including BLE and Classic Bluetooth interfaces. - Created a REST API using FastAPI to manage printer status, info, and printing tasks (text and images). - Developed a CLI for direct printer interaction, allowing users to print text and images, check status, and modify settings. - Added image processing capabilities for converting text and images to the required format for printing. - Introduced error handling for printer operations and connection management. - Included a shell script for running the API server with configurable parameters. - Added English translations for configuration options. - Created a repository metadata file for project management.
294 lines
11 KiB
Python
294 lines
11 KiB
Python
"""HTTP REST API for the Fichero D11s thermal label printer.
|
||
|
||
Start with:
|
||
fichero-server [--host HOST] [--port PORT]
|
||
or:
|
||
python -m fichero.api
|
||
|
||
Endpoints:
|
||
GET /status – Printer status
|
||
GET /info – Printer info (model, firmware, battery, …)
|
||
POST /print/text – Print a text label
|
||
POST /print/image – Print an uploaded image file
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import asyncio
|
||
import io
|
||
import os
|
||
from contextlib import asynccontextmanager
|
||
from typing import Annotated
|
||
|
||
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from PIL import Image
|
||
|
||
from fichero.cli import DOTS_PER_MM, do_print
|
||
from fichero.imaging import text_to_image
|
||
from fichero.printer import (
|
||
PAPER_GAP,
|
||
PrinterError,
|
||
PrinterNotFound,
|
||
PrinterTimeout,
|
||
connect,
|
||
)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Global connection settings (env vars or CLI flags at startup)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_DEFAULT_ADDRESS: str | None = os.environ.get("FICHERO_ADDR")
|
||
_DEFAULT_CLASSIC: bool = os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic"
|
||
_DEFAULT_CHANNEL: int = int(os.environ.get("FICHERO_CHANNEL", "1"))
|
||
|
||
_PAPER_MAP = {"gap": 0, "black": 1, "continuous": 2}
|
||
|
||
|
||
def _parse_paper(value: str) -> int:
|
||
if value in _PAPER_MAP:
|
||
return _PAPER_MAP[value]
|
||
try:
|
||
val = int(value)
|
||
if 0 <= val <= 2:
|
||
return val
|
||
except ValueError:
|
||
pass
|
||
raise HTTPException(status_code=422, detail=f"Invalid paper type '{value}'. Use gap, black, continuous or 0-2.")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# FastAPI app
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI): # noqa: ARG001
|
||
yield
|
||
|
||
|
||
app = FastAPI(
|
||
title="Fichero Printer API",
|
||
description="REST API for the Fichero D11s (AiYin) thermal label printer.",
|
||
version="0.1.0",
|
||
lifespan=lifespan,
|
||
)
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
def _address(address: str | None) -> str | None:
|
||
"""Return the effective BLE address (request value overrides env default)."""
|
||
return address or _DEFAULT_ADDRESS
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Endpoints
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@app.get(
|
||
"/status",
|
||
summary="Get printer status",
|
||
response_description="Current printer status flags",
|
||
)
|
||
async def get_status(
|
||
address: str | None = None,
|
||
classic: bool = _DEFAULT_CLASSIC,
|
||
channel: int = _DEFAULT_CHANNEL,
|
||
):
|
||
"""Return the real-time status of the printer (paper, battery, heat, …)."""
|
||
try:
|
||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||
status = await pc.get_status()
|
||
except PrinterNotFound as exc:
|
||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||
except PrinterTimeout as exc:
|
||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||
except PrinterError as exc:
|
||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||
|
||
return {
|
||
"ok": status.ok,
|
||
"printing": status.printing,
|
||
"cover_open": status.cover_open,
|
||
"no_paper": status.no_paper,
|
||
"low_battery": status.low_battery,
|
||
"overheated": status.overheated,
|
||
"charging": status.charging,
|
||
"raw": status.raw,
|
||
}
|
||
|
||
|
||
@app.get(
|
||
"/info",
|
||
summary="Get printer info",
|
||
response_description="Model, firmware, serial number and battery level",
|
||
)
|
||
async def get_info(
|
||
address: str | None = None,
|
||
classic: bool = _DEFAULT_CLASSIC,
|
||
channel: int = _DEFAULT_CHANNEL,
|
||
):
|
||
"""Return static and dynamic printer information."""
|
||
try:
|
||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||
info = await pc.get_info()
|
||
info.update(await pc.get_all_info())
|
||
except PrinterNotFound as exc:
|
||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||
except PrinterTimeout as exc:
|
||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||
except PrinterError as exc:
|
||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||
|
||
return info
|
||
|
||
|
||
@app.post(
|
||
"/print/text",
|
||
summary="Print a text label",
|
||
status_code=200,
|
||
)
|
||
async def print_text(
|
||
text: Annotated[str, Form(description="Text to print on the label")],
|
||
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
|
||
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
|
||
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
|
||
font_size: Annotated[int, Form(description="Font size in points", ge=6, le=200)] = 30,
|
||
label_length: Annotated[int | None, Form(description="Label length in mm (overrides label_height)", ge=5, le=500)] = None,
|
||
label_height: Annotated[int, Form(description="Label height in pixels", ge=40, le=4000)] = 240,
|
||
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
|
||
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
|
||
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
|
||
):
|
||
"""Print a plain-text label.
|
||
|
||
The text is rendered as a 96 px wide, 1-bit image and sent to the printer.
|
||
"""
|
||
paper_val = _parse_paper(paper)
|
||
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
|
||
|
||
img = text_to_image(text, font_size=font_size, label_height=max_rows)
|
||
|
||
try:
|
||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
|
||
dither=False, max_rows=max_rows)
|
||
except PrinterNotFound as exc:
|
||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||
except PrinterTimeout as exc:
|
||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||
except PrinterError as exc:
|
||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||
|
||
if not ok:
|
||
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
|
||
|
||
return {"ok": True, "copies": copies, "text": text}
|
||
|
||
|
||
@app.post(
|
||
"/print/image",
|
||
summary="Print an image",
|
||
status_code=200,
|
||
)
|
||
async def print_image(
|
||
file: Annotated[UploadFile, File(description="Image file to print (PNG, JPEG, BMP, …)")],
|
||
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
|
||
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
|
||
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
|
||
dither: Annotated[bool, Form(description="Apply Floyd-Steinberg dithering")] = True,
|
||
label_length: Annotated[int | None, Form(description="Max label length in mm (overrides label_height)", ge=5, le=500)] = None,
|
||
label_height: Annotated[int, Form(description="Max label height in pixels", ge=40, le=4000)] = 240,
|
||
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
|
||
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
|
||
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
|
||
):
|
||
"""Print an image file.
|
||
|
||
The image is resized to 96 px wide, optionally dithered to 1-bit, and sent to the printer.
|
||
Supported formats: PNG, JPEG, BMP, GIF, TIFF, WEBP.
|
||
"""
|
||
# Validate content type loosely — Pillow will raise on unsupported data
|
||
data = await file.read()
|
||
if not data:
|
||
raise HTTPException(status_code=422, detail="Uploaded file is empty.")
|
||
|
||
try:
|
||
img = Image.open(io.BytesIO(data))
|
||
img.load() # ensure the image is fully decoded
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=422, detail=f"Cannot decode image: {exc}") from exc
|
||
|
||
paper_val = _parse_paper(paper)
|
||
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
|
||
|
||
try:
|
||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
|
||
dither=dither, max_rows=max_rows)
|
||
except PrinterNotFound as exc:
|
||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||
except PrinterTimeout as exc:
|
||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||
except PrinterError as exc:
|
||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||
|
||
if not ok:
|
||
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
|
||
|
||
return {"ok": True, "copies": copies, "filename": file.filename}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Entry point
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
"""Start the Fichero HTTP API server."""
|
||
try:
|
||
import uvicorn # noqa: PLC0415
|
||
except ImportError:
|
||
print("ERROR: uvicorn is required to run the API server.")
|
||
print("Install it with: pip install 'fichero-printer[api]'")
|
||
raise SystemExit(1) from None
|
||
|
||
parser = argparse.ArgumentParser(description="Fichero Printer API Server")
|
||
parser.add_argument("--host", default="127.0.0.1", help="Bind host (default: 127.0.0.1)")
|
||
parser.add_argument("--port", type=int, default=8765, help="Bind port (default: 8765)")
|
||
parser.add_argument("--address", default=_DEFAULT_ADDRESS, metavar="BLE_ADDR",
|
||
help="Default BLE address (or set FICHERO_ADDR env var)")
|
||
parser.add_argument("--classic", action="store_true", default=_DEFAULT_CLASSIC,
|
||
help="Default to Classic Bluetooth RFCOMM")
|
||
parser.add_argument("--channel", type=int, default=_DEFAULT_CHANNEL,
|
||
help="Default RFCOMM channel (default: 1)")
|
||
parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)")
|
||
args = parser.parse_args()
|
||
|
||
# Push CLI overrides into module-level defaults so all handlers pick them up
|
||
global _DEFAULT_ADDRESS, _DEFAULT_CLASSIC, _DEFAULT_CHANNEL
|
||
_DEFAULT_ADDRESS = args.address
|
||
_DEFAULT_CLASSIC = args.classic
|
||
_DEFAULT_CHANNEL = args.channel
|
||
|
||
# Pass the app object directly when not reloading so that the module-level
|
||
# globals (_DEFAULT_ADDRESS etc.) set above are visible to the handlers.
|
||
# The string form "fichero.api:app" is required for --reload only, because
|
||
# uvicorn's reloader needs to re-import the module in a worker process.
|
||
uvicorn.run(
|
||
"fichero.api:app" if args.reload else app,
|
||
host=args.host,
|
||
port=args.port,
|
||
reload=args.reload,
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|