@@ -0,0 +1,418 @@
""" HTTP REST API for the Fichero D11s thermal label printer.
Start with:
fichero-server [--host HOST] [--port PORT]
or:
python -m fichero.api
Endpoints:
GET /status – Printer status
GET /info – Printer info (model, firmware, battery, …)
POST /print/text – Print a text label
POST /print/image – Print an uploaded image file
"""
from __future__ import annotations
import argparse
import asyncio
import io
import re
import os
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Annotated
from fastapi import FastAPI , File , Form , HTTPException , UploadFile
from fastapi . middleware . cors import CORSMiddleware
from fastapi . openapi . docs import get_swagger_ui_html
from fastapi . responses import HTMLResponse , RedirectResponse
from PIL import Image
from fichero . cli import DOTS_PER_MM , do_print
from fichero . imaging import text_to_image
from fichero . printer import (
PAPER_GAP ,
PrinterError ,
PrinterNotFound ,
PrinterTimeout ,
connect ,
)
# ---------------------------------------------------------------------------
# Global connection settings (env vars or CLI flags at startup)
# ---------------------------------------------------------------------------
_DEFAULT_ADDRESS : str | None = os . environ . get ( " FICHERO_ADDR " )
_DEFAULT_CLASSIC : bool = os . environ . get ( " FICHERO_TRANSPORT " , " " ) . lower ( ) == " classic "
_DEFAULT_CHANNEL : int = int ( os . environ . get ( " FICHERO_CHANNEL " , " 1 " ) )
_PAPER_MAP = { " gap " : 0 , " black " : 1 , " continuous " : 2 }
def _parse_paper ( value : str ) - > int :
if value in _PAPER_MAP :
return _PAPER_MAP [ value ]
try :
val = int ( value )
if 0 < = val < = 2 :
return val
except ValueError :
pass
raise HTTPException ( status_code = 422 , detail = f " Invalid paper type ' { value } ' . Use gap, black, continuous or 0-2. " )
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan ( app : FastAPI ) : # noqa: ARG001
yield
app = FastAPI (
title = " Fichero Printer API " ,
description = " REST API for the Fichero D11s (AiYin) thermal label printer. " ,
version = " 0.1.26 " ,
lifespan = lifespan ,
docs_url = None ,
redoc_url = None ,
)
app . add_middleware (
CORSMiddleware ,
allow_origins = [ " * " ] ,
allow_methods = [ " * " ] ,
allow_headers = [ " * " ] ,
)
def _address ( address : str | None ) - > str | None :
""" Return the effective BLE address (request value overrides env default). """
return address or _DEFAULT_ADDRESS
def _ui_html ( ) - > str :
default_address = _DEFAULT_ADDRESS or " "
default_transport = " classic " if _DEFAULT_CLASSIC else " ble "
try :
template_path = Path ( __file__ ) . parent / " index.html "
template = template_path . read_text ( encoding = " utf-8 " )
except FileNotFoundError :
return " <h1>Error: index.html not found</h1> "
# Simple substitution for initial values
return (
template . replace ( " {default_address} " , default_address )
. replace ( " {ble_selected} " , " selected " if default_transport == " ble " else " " )
. replace ( " {classic_selected} " , " selected " if default_transport == " classic " else " " )
. replace ( " {default_channel} " , str ( _DEFAULT_CHANNEL ) )
)
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get ( " / " , include_in_schema = False , response_class = HTMLResponse )
async def root ( ) :
""" Serve a compact printer UI for Home Assistant. """
return HTMLResponse ( _ui_html ( ) )
@app.get ( " /docs " , include_in_schema = False )
async def docs ( ) :
""" Serve Swagger UI with ingress-safe relative OpenAPI URL. """
return get_swagger_ui_html (
openapi_url = " openapi.json " ,
title = f " { app . title } - Swagger UI " ,
)
@app.get (
" /status " ,
summary = " Get printer status " ,
response_description = " Current printer status flags " ,
)
async def get_status (
address : str | None = None ,
classic : bool = _DEFAULT_CLASSIC ,
channel : int = _DEFAULT_CHANNEL ,
) :
""" Return the real-time status of the printer (paper, battery, heat, …). """
try :
async with connect ( _address ( address ) , classic = classic , channel = channel ) as pc :
status = await pc . get_status ( )
except PrinterNotFound as exc :
raise HTTPException ( status_code = 404 , detail = str ( exc ) ) from exc
except PrinterTimeout as exc :
raise HTTPException ( status_code = 504 , detail = str ( exc ) ) from exc
except PrinterError as exc :
raise HTTPException ( status_code = 502 , detail = str ( exc ) ) from exc
return {
" ok " : status . ok ,
" printing " : status . printing ,
" cover_open " : status . cover_open ,
" no_paper " : status . no_paper ,
" low_battery " : status . low_battery ,
" overheated " : status . overheated ,
" charging " : status . charging ,
" raw " : status . raw ,
}
@app.get (
" /info " ,
summary = " Get printer info " ,
response_description = " Model, firmware, serial number and battery level " ,
)
async def get_info (
address : str | None = None ,
classic : bool = _DEFAULT_CLASSIC ,
channel : int = _DEFAULT_CHANNEL ,
) :
""" Return static and dynamic printer information. """
try :
async with connect ( _address ( address ) , classic = classic , channel = channel ) as pc :
info = await pc . get_info ( )
info . update ( await pc . get_all_info ( ) )
except PrinterNotFound as exc :
raise HTTPException ( status_code = 404 , detail = str ( exc ) ) from exc
except PrinterTimeout as exc :
raise HTTPException ( status_code = 504 , detail = str ( exc ) ) from exc
except PrinterError as exc :
raise HTTPException ( status_code = 502 , detail = str ( exc ) ) from exc
return info
@app.post (
" /pair " ,
summary = " Pair and trust a Bluetooth device " ,
status_code = 200 ,
)
async def pair_device (
address : Annotated [ str | None , Form ( description = " Device address (optional, overrides FICHERO_ADDR) " ) ] = None ,
) :
"""
Attempt to pair and trust the device using `bluetoothctl`.
This is intended for setting up Classic Bluetooth connections.
"""
addr = _address ( address )
if not addr :
raise HTTPException ( status_code = 422 , detail = " Address is required to pair. " )
# Basic validation for MAC address format to mitigate injection risk.
if not re . match ( r " ^[0-9A-F] {2} (:[0-9A-F] {2} ) {5} $ " , addr , re . IGNORECASE ) :
raise HTTPException ( status_code = 422 , detail = f " Invalid address format: { addr } " )
cmd = f ' echo -e " pair { addr } \\ ntrust { addr } \\ nquit " | bluetoothctl '
try :
proc = await asyncio . create_subprocess_shell (
cmd ,
stdout = asyncio . subprocess . PIPE ,
stderr = asyncio . subprocess . PIPE ,
)
stdout , stderr = await asyncio . wait_for ( proc . communicate ( ) , timeout = 15.0 )
except FileNotFoundError :
raise HTTPException ( status_code = 500 , detail = " `bluetoothctl` command not found. Is BlueZ installed and in PATH? " )
except asyncio . TimeoutError :
raise HTTPException ( status_code = 504 , detail = " `bluetoothctl` command timed out after 15 seconds. " )
output = stdout . decode ( errors = " ignore " )
error = stderr . decode ( errors = " ignore " )
if " Failed to pair " in output or " not available " in output . lower ( ) :
raise HTTPException ( status_code = 502 , detail = f " Pairing failed. Output: { output } . Error: { error } " )
return { " ok " : True , " message " : " Pair/trust command sent. Check output for details. " , " output " : output , " error " : error }
@app.post (
" /unpair " ,
summary = " Unpair a Bluetooth device " ,
status_code = 200 ,
)
async def unpair_device (
address : Annotated [ str | None , Form ( description = " Device address (optional, overrides FICHERO_ADDR) " ) ] = None ,
) :
"""
Attempt to unpair the device using `bluetoothctl`.
"""
addr = _address ( address )
if not addr :
raise HTTPException ( status_code = 422 , detail = " Address is required to unpair. " )
# Basic validation for MAC address format to mitigate injection risk.
if not re . match ( r " ^[0-9A-F] {2} (:[0-9A-F] {2} ) {5} $ " , addr , re . IGNORECASE ) :
raise HTTPException ( status_code = 422 , detail = f " Invalid address format: { addr } " )
cmd = f ' echo -e " remove { addr } \\ nquit " | bluetoothctl '
try :
proc = await asyncio . create_subprocess_shell (
cmd ,
stdout = asyncio . subprocess . PIPE ,
stderr = asyncio . subprocess . PIPE ,
)
stdout , stderr = await asyncio . wait_for ( proc . communicate ( ) , timeout = 15.0 )
except FileNotFoundError :
raise HTTPException ( status_code = 500 , detail = " `bluetoothctl` command not found. Is BlueZ installed and in PATH? " )
except asyncio . TimeoutError :
raise HTTPException ( status_code = 504 , detail = " `bluetoothctl` command timed out after 15 seconds. " )
output = stdout . decode ( errors = " ignore " )
error = stderr . decode ( errors = " ignore " )
if " Failed to remove " in output or " not available " in output . lower ( ) :
raise HTTPException ( status_code = 502 , detail = f " Unpairing failed. Output: { output } . Error: { error } " )
return { " ok " : True , " message " : " Unpair command sent. Check output for details. " , " output " : output , " error " : error }
@app.post (
" /print/text " ,
summary = " Print a text label " ,
status_code = 200 ,
)
async def print_text (
text : Annotated [ str , Form ( description = " Text to print on the label " ) ] ,
density : Annotated [ int , Form ( description = " Print density: 0=light, 1=medium, 2=dark " , ge = 0 , le = 2 ) ] = 2 ,
paper : Annotated [ str , Form ( description = " Paper type: gap, black, or continuous " ) ] = " gap " ,
copies : Annotated [ int , Form ( description = " Number of copies " , ge = 1 , le = 99 ) ] = 1 ,
font_size : Annotated [ int , Form ( description = " Font size in points " , ge = 6 , le = 200 ) ] = 30 ,
label_length : Annotated [ int | None , Form ( description = " Label length in mm (overrides label_height) " , ge = 5 , le = 500 ) ] = None ,
label_height : Annotated [ int , Form ( description = " Label height in pixels " , ge = 40 , le = 4000 ) ] = 240 ,
address : Annotated [ str | None , Form ( description = " BLE address (optional, overrides FICHERO_ADDR) " ) ] = None ,
classic : Annotated [ bool , Form ( description = " Use Classic Bluetooth RFCOMM " ) ] = _DEFAULT_CLASSIC ,
channel : Annotated [ int , Form ( description = " RFCOMM channel " ) ] = _DEFAULT_CHANNEL ,
) :
""" Print a plain-text label.
The text is rendered as a 96 px wide, 1-bit image and sent to the printer.
"""
paper_val = _parse_paper ( paper )
max_rows = ( label_length * DOTS_PER_MM ) if label_length is not None else label_height
img = text_to_image ( text , font_size = font_size , label_height = max_rows )
try :
async with connect ( _address ( address ) , classic = classic , channel = channel ) as pc :
ok = await do_print ( pc , img , density = density , paper = paper_val , copies = copies ,
dither = False , max_rows = max_rows )
except PrinterNotFound as exc :
raise HTTPException ( status_code = 404 , detail = str ( exc ) ) from exc
except PrinterTimeout as exc :
raise HTTPException ( status_code = 504 , detail = str ( exc ) ) from exc
except PrinterError as exc :
raise HTTPException ( status_code = 502 , detail = str ( exc ) ) from exc
if not ok :
raise HTTPException ( status_code = 502 , detail = " Printer did not confirm completion. " )
return { " ok " : True , " copies " : copies , " text " : text }
@app.post (
" /print/image " ,
summary = " Print an image " ,
status_code = 200 ,
)
async def print_image (
file : Annotated [ UploadFile , File ( description = " Image file to print (PNG, JPEG, BMP, …) " ) ] ,
density : Annotated [ int , Form ( description = " Print density: 0=light, 1=medium, 2=dark " , ge = 0 , le = 2 ) ] = 2 ,
paper : Annotated [ str , Form ( description = " Paper type: gap, black, or continuous " ) ] = " gap " ,
copies : Annotated [ int , Form ( description = " Number of copies " , ge = 1 , le = 99 ) ] = 1 ,
dither : Annotated [ bool , Form ( description = " Apply Floyd-Steinberg dithering " ) ] = True ,
label_length : Annotated [ int | None , Form ( description = " Max label length in mm (overrides label_height) " , ge = 5 , le = 500 ) ] = None ,
label_height : Annotated [ int , Form ( description = " Max label height in pixels " , ge = 40 , le = 4000 ) ] = 240 ,
address : Annotated [ str | None , Form ( description = " BLE address (optional, overrides FICHERO_ADDR) " ) ] = None ,
classic : Annotated [ bool , Form ( description = " Use Classic Bluetooth RFCOMM " ) ] = _DEFAULT_CLASSIC ,
channel : Annotated [ int , Form ( description = " RFCOMM channel " ) ] = _DEFAULT_CHANNEL ,
) :
""" Print an image file.
The image is resized to 96 px wide, optionally dithered to 1-bit, and sent to the printer.
Supported formats: PNG, JPEG, BMP, GIF, TIFF, WEBP.
"""
# Validate content type loosely — Pillow will raise on unsupported data
data = await file . read ( )
if not data :
raise HTTPException ( status_code = 422 , detail = " Uploaded file is empty. " )
try :
img = Image . open ( io . BytesIO ( data ) )
img . load ( ) # ensure the image is fully decoded
except Exception as exc :
raise HTTPException ( status_code = 422 , detail = f " Cannot decode image: { exc } " ) from exc
paper_val = _parse_paper ( paper )
max_rows = ( label_length * DOTS_PER_MM ) if label_length is not None else label_height
try :
async with connect ( _address ( address ) , classic = classic , channel = channel ) as pc :
ok = await do_print ( pc , img , density = density , paper = paper_val , copies = copies ,
dither = dither , max_rows = max_rows )
except PrinterNotFound as exc :
raise HTTPException ( status_code = 404 , detail = str ( exc ) ) from exc
except PrinterTimeout as exc :
raise HTTPException ( status_code = 504 , detail = str ( exc ) ) from exc
except PrinterError as exc :
raise HTTPException ( status_code = 502 , detail = str ( exc ) ) from exc
if not ok :
raise HTTPException ( status_code = 502 , detail = " Printer did not confirm completion. " )
return { " ok " : True , " copies " : copies , " filename " : file . filename }
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main ( ) - > None :
""" Start the Fichero HTTP API server. """
global _DEFAULT_ADDRESS , _DEFAULT_CLASSIC , _DEFAULT_CHANNEL
try :
import uvicorn # noqa: PLC0415
except ImportError :
print ( " ERROR: uvicorn is required to run the API server. " )
print ( " Install it with: pip install ' fichero-printer[api] ' " )
raise SystemExit ( 1 ) from None
parser = argparse . ArgumentParser ( description = " Fichero Printer API Server " )
parser . add_argument ( " --host " , default = " 127.0.0.1 " , help = " Bind host (default: 127.0.0.1) " )
parser . add_argument ( " --port " , type = int , default = 8765 , help = " Bind port (default: 8765) " )
parser . add_argument ( " --address " , default = _DEFAULT_ADDRESS , metavar = " BLE_ADDR " ,
help = " Default BLE address (or set FICHERO_ADDR env var) " )
parser . add_argument ( " --classic " , action = " store_true " , default = _DEFAULT_CLASSIC ,
help = " Default to Classic Bluetooth RFCOMM " )
parser . add_argument ( " --channel " , type = int , default = _DEFAULT_CHANNEL ,
help = " Default RFCOMM channel (default: 1) " )
parser . add_argument ( " --reload " , action = " store_true " , help = " Enable auto-reload (development) " )
args = parser . parse_args ( )
# Push CLI overrides into module-level defaults so all handlers pick them up
_DEFAULT_ADDRESS = args . address
_DEFAULT_CLASSIC = args . classic
_DEFAULT_CHANNEL = args . channel
# Pass the app object directly when not reloading so that the module-level
# globals (_DEFAULT_ADDRESS etc.) set above are visible to the handlers.
# The string form "fichero.api:app" is required for --reload only, because
# uvicorn's reloader needs to re-import the module in a worker process.
uvicorn . run (
" fichero.api:app " if args . reload else app ,
host = args . host ,
port = args . port ,
reload = args . reload ,
)
if __name__ == " __main__ " :
main ( )