Implement HTTP API for Fichero D11s printer with status and print endpoints
This commit is contained in:
141
docs/PROTOCOL.md
141
docs/PROTOCOL.md
@@ -189,3 +189,144 @@ Fichero-branded printers:
|
||||
4. Traced the device class hierarchy: D11s -> AiYinNormalDevice -> BaseNormalDevice
|
||||
5. Found the AiYin-specific enable/stop commands that were different from the base class
|
||||
6. Tested every discovered command against the actual hardware and documented which ones work
|
||||
|
||||
---
|
||||
|
||||
## HTTP API (`fichero/api.py`)
|
||||
|
||||
A FastAPI-based REST server that wraps the printer logic.
|
||||
|
||||
### Installation
|
||||
|
||||
```bash
|
||||
pip install 'fichero-printer[api]'
|
||||
```
|
||||
|
||||
### Starting the server
|
||||
|
||||
```bash
|
||||
fichero-server [--host HOST] [--port PORT] [--address BLE_ADDR] [--classic] [--channel N]
|
||||
```
|
||||
|
||||
Default: `http://127.0.0.1:8765`.
|
||||
The BLE address can also be set via the `FICHERO_ADDR` environment variable.
|
||||
Interactive docs available at `http://127.0.0.1:8765/docs`.
|
||||
|
||||
---
|
||||
|
||||
### `GET /status`
|
||||
|
||||
Returns the real-time printer status.
|
||||
|
||||
**Query parameters** (all optional):
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|-----------|---------|------------------|-------------------------------------|
|
||||
| `address` | string | `FICHERO_ADDR` | BLE address (skips scanning) |
|
||||
| `classic` | boolean | `false` | Use Classic Bluetooth RFCOMM |
|
||||
| `channel` | integer | `1` | RFCOMM channel |
|
||||
|
||||
**Response `200`:**
|
||||
```json
|
||||
{
|
||||
"ok": true,
|
||||
"printing": false,
|
||||
"cover_open": false,
|
||||
"no_paper": false,
|
||||
"low_battery": false,
|
||||
"overheated": false,
|
||||
"charging": false,
|
||||
"raw": 0
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### `GET /info`
|
||||
|
||||
Returns static and dynamic printer information (model, firmware, serial, battery, …).
|
||||
|
||||
Same query parameters as `/status`.
|
||||
|
||||
**Response `200`:** JSON object with all info keys returned by the printer.
|
||||
|
||||
---
|
||||
|
||||
### `POST /print/text`
|
||||
|
||||
Print a plain-text label. Sends `multipart/form-data`.
|
||||
|
||||
**Form fields:**
|
||||
|
||||
| Field | Type | Default | Required | Description |
|
||||
|----------------|---------|---------|----------|--------------------------------------------------|
|
||||
| `text` | string | — | ✓ | Text to print |
|
||||
| `density` | integer | `2` | | Print density: 0=light, 1=medium, 2=dark |
|
||||
| `paper` | string | `gap` | | Paper type: `gap`, `black`, `continuous` (0-2) |
|
||||
| `copies` | integer | `1` | | Number of copies (1–99) |
|
||||
| `font_size` | integer | `30` | | Font size in points |
|
||||
| `label_length` | integer | — | | Label length in mm (overrides `label_height`) |
|
||||
| `label_height` | integer | `240` | | Label height in pixels |
|
||||
| `address` | string | — | | BLE address override |
|
||||
| `classic` | boolean | — | | Use Classic Bluetooth RFCOMM |
|
||||
| `channel` | integer | — | | RFCOMM channel |
|
||||
|
||||
**Response `200`:**
|
||||
```json
|
||||
{ "ok": true, "copies": 1, "text": "Hello World" }
|
||||
```
|
||||
|
||||
**Example (`curl`):**
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:8765/print/text \
|
||||
-F text="Hello World" \
|
||||
-F density=2 \
|
||||
-F paper=gap \
|
||||
-F label_length=30
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### `POST /print/image`
|
||||
|
||||
Print an image file. Sends `multipart/form-data`.
|
||||
|
||||
**Form fields:**
|
||||
|
||||
| Field | Type | Default | Required | Description |
|
||||
|----------------|---------|---------|----------|--------------------------------------------------|
|
||||
| `file` | file | — | ✓ | Image file (PNG, JPEG, BMP, GIF, TIFF, WEBP) |
|
||||
| `density` | integer | `2` | | Print density: 0=light, 1=medium, 2=dark |
|
||||
| `paper` | string | `gap` | | Paper type: `gap`, `black`, `continuous` (0-2) |
|
||||
| `copies` | integer | `1` | | Number of copies (1–99) |
|
||||
| `dither` | boolean | `true` | | Apply Floyd-Steinberg dithering |
|
||||
| `label_length` | integer | — | | Max label length in mm (overrides `label_height`)|
|
||||
| `label_height` | integer | `240` | | Max label height in pixels |
|
||||
| `address` | string | — | | BLE address override |
|
||||
| `classic` | boolean | — | | Use Classic Bluetooth RFCOMM |
|
||||
| `channel` | integer | — | | RFCOMM channel |
|
||||
|
||||
**Response `200`:**
|
||||
```json
|
||||
{ "ok": true, "copies": 1, "filename": "label.png" }
|
||||
```
|
||||
|
||||
**Example (`curl`):**
|
||||
```bash
|
||||
curl -X POST http://127.0.0.1:8765/print/image \
|
||||
-F file=@label.png \
|
||||
-F density=2 \
|
||||
-F dither=true \
|
||||
-F label_length=40
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Error responses
|
||||
|
||||
| Status | Meaning |
|
||||
|--------|---------------------------------------------------------|
|
||||
| `404` | Printer not found (BLE scan failed or address invalid) |
|
||||
| `422` | Validation error (bad parameter value or empty file) |
|
||||
| `502` | Printer communication error |
|
||||
| `504` | Printer timed out |
|
||||
|
||||
289
fichero/api.py
Normal file
289
fichero/api.py
Normal file
@@ -0,0 +1,289 @@
|
||||
"""HTTP REST API for the Fichero D11s thermal label printer.
|
||||
|
||||
Start with:
|
||||
fichero-server [--host HOST] [--port PORT]
|
||||
or:
|
||||
python -m fichero.api
|
||||
|
||||
Endpoints:
|
||||
GET /status – Printer status
|
||||
GET /info – Printer info (model, firmware, battery, …)
|
||||
POST /print/text – Print a text label
|
||||
POST /print/image – Print an uploaded image file
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import io
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from PIL import Image
|
||||
|
||||
from fichero.cli import DOTS_PER_MM, do_print
|
||||
from fichero.imaging import text_to_image
|
||||
from fichero.printer import (
|
||||
PAPER_GAP,
|
||||
PrinterError,
|
||||
PrinterNotFound,
|
||||
PrinterTimeout,
|
||||
connect,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Global connection settings (env vars or CLI flags at startup)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_DEFAULT_ADDRESS: str | None = os.environ.get("FICHERO_ADDR")
|
||||
_DEFAULT_CLASSIC: bool = os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic"
|
||||
_DEFAULT_CHANNEL: int = int(os.environ.get("FICHERO_CHANNEL", "1"))
|
||||
|
||||
_PAPER_MAP = {"gap": 0, "black": 1, "continuous": 2}
|
||||
|
||||
|
||||
def _parse_paper(value: str) -> int:
|
||||
if value in _PAPER_MAP:
|
||||
return _PAPER_MAP[value]
|
||||
try:
|
||||
val = int(value)
|
||||
if 0 <= val <= 2:
|
||||
return val
|
||||
except ValueError:
|
||||
pass
|
||||
raise HTTPException(status_code=422, detail=f"Invalid paper type '{value}'. Use gap, black, continuous or 0-2.")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FastAPI app
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI): # noqa: ARG001
|
||||
yield
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="Fichero Printer API",
|
||||
description="REST API for the Fichero D11s (AiYin) thermal label printer.",
|
||||
version="0.1.0",
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
|
||||
def _address(address: str | None) -> str | None:
|
||||
"""Return the effective BLE address (request value overrides env default)."""
|
||||
return address or _DEFAULT_ADDRESS
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@app.get(
|
||||
"/status",
|
||||
summary="Get printer status",
|
||||
response_description="Current printer status flags",
|
||||
)
|
||||
async def get_status(
|
||||
address: str | None = None,
|
||||
classic: bool = _DEFAULT_CLASSIC,
|
||||
channel: int = _DEFAULT_CHANNEL,
|
||||
):
|
||||
"""Return the real-time status of the printer (paper, battery, heat, …)."""
|
||||
try:
|
||||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||||
status = await pc.get_status()
|
||||
except PrinterNotFound as exc:
|
||||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||||
except PrinterTimeout as exc:
|
||||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||||
except PrinterError as exc:
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
|
||||
return {
|
||||
"ok": status.ok,
|
||||
"printing": status.printing,
|
||||
"cover_open": status.cover_open,
|
||||
"no_paper": status.no_paper,
|
||||
"low_battery": status.low_battery,
|
||||
"overheated": status.overheated,
|
||||
"charging": status.charging,
|
||||
"raw": status.raw,
|
||||
}
|
||||
|
||||
|
||||
@app.get(
|
||||
"/info",
|
||||
summary="Get printer info",
|
||||
response_description="Model, firmware, serial number and battery level",
|
||||
)
|
||||
async def get_info(
|
||||
address: str | None = None,
|
||||
classic: bool = _DEFAULT_CLASSIC,
|
||||
channel: int = _DEFAULT_CHANNEL,
|
||||
):
|
||||
"""Return static and dynamic printer information."""
|
||||
try:
|
||||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||||
info = await pc.get_info()
|
||||
info.update(await pc.get_all_info())
|
||||
except PrinterNotFound as exc:
|
||||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||||
except PrinterTimeout as exc:
|
||||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||||
except PrinterError as exc:
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
|
||||
return info
|
||||
|
||||
|
||||
@app.post(
|
||||
"/print/text",
|
||||
summary="Print a text label",
|
||||
status_code=200,
|
||||
)
|
||||
async def print_text(
|
||||
text: Annotated[str, Form(description="Text to print on the label")],
|
||||
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
|
||||
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
|
||||
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
|
||||
font_size: Annotated[int, Form(description="Font size in points", ge=6, le=200)] = 30,
|
||||
label_length: Annotated[int | None, Form(description="Label length in mm (overrides label_height)", ge=5, le=500)] = None,
|
||||
label_height: Annotated[int, Form(description="Label height in pixels", ge=40, le=4000)] = 240,
|
||||
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
|
||||
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
|
||||
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
|
||||
):
|
||||
"""Print a plain-text label.
|
||||
|
||||
The text is rendered as a 96 px wide, 1-bit image and sent to the printer.
|
||||
"""
|
||||
paper_val = _parse_paper(paper)
|
||||
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
|
||||
|
||||
img = text_to_image(text, font_size=font_size, label_height=max_rows)
|
||||
|
||||
try:
|
||||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||||
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
|
||||
dither=False, max_rows=max_rows)
|
||||
except PrinterNotFound as exc:
|
||||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||||
except PrinterTimeout as exc:
|
||||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||||
except PrinterError as exc:
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
|
||||
if not ok:
|
||||
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
|
||||
|
||||
return {"ok": True, "copies": copies, "text": text}
|
||||
|
||||
|
||||
@app.post(
|
||||
"/print/image",
|
||||
summary="Print an image",
|
||||
status_code=200,
|
||||
)
|
||||
async def print_image(
|
||||
file: Annotated[UploadFile, File(description="Image file to print (PNG, JPEG, BMP, …)")],
|
||||
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
|
||||
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
|
||||
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
|
||||
dither: Annotated[bool, Form(description="Apply Floyd-Steinberg dithering")] = True,
|
||||
label_length: Annotated[int | None, Form(description="Max label length in mm (overrides label_height)", ge=5, le=500)] = None,
|
||||
label_height: Annotated[int, Form(description="Max label height in pixels", ge=40, le=4000)] = 240,
|
||||
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
|
||||
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
|
||||
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
|
||||
):
|
||||
"""Print an image file.
|
||||
|
||||
The image is resized to 96 px wide, optionally dithered to 1-bit, and sent to the printer.
|
||||
Supported formats: PNG, JPEG, BMP, GIF, TIFF, WEBP.
|
||||
"""
|
||||
# Validate content type loosely — Pillow will raise on unsupported data
|
||||
data = await file.read()
|
||||
if not data:
|
||||
raise HTTPException(status_code=422, detail="Uploaded file is empty.")
|
||||
|
||||
try:
|
||||
img = Image.open(io.BytesIO(data))
|
||||
img.load() # ensure the image is fully decoded
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=422, detail=f"Cannot decode image: {exc}") from exc
|
||||
|
||||
paper_val = _parse_paper(paper)
|
||||
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
|
||||
|
||||
try:
|
||||
async with connect(_address(address), classic=classic, channel=channel) as pc:
|
||||
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
|
||||
dither=dither, max_rows=max_rows)
|
||||
except PrinterNotFound as exc:
|
||||
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||||
except PrinterTimeout as exc:
|
||||
raise HTTPException(status_code=504, detail=str(exc)) from exc
|
||||
except PrinterError as exc:
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
|
||||
if not ok:
|
||||
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
|
||||
|
||||
return {"ok": True, "copies": copies, "filename": file.filename}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main() -> None:
|
||||
"""Start the Fichero HTTP API server."""
|
||||
try:
|
||||
import uvicorn # noqa: PLC0415
|
||||
except ImportError:
|
||||
print("ERROR: uvicorn is required to run the API server.")
|
||||
print("Install it with: pip install 'fichero-printer[api]'")
|
||||
raise SystemExit(1) from None
|
||||
|
||||
parser = argparse.ArgumentParser(description="Fichero Printer API Server")
|
||||
parser.add_argument("--host", default="127.0.0.1", help="Bind host (default: 127.0.0.1)")
|
||||
parser.add_argument("--port", type=int, default=8765, help="Bind port (default: 8765)")
|
||||
parser.add_argument("--address", default=_DEFAULT_ADDRESS, metavar="BLE_ADDR",
|
||||
help="Default BLE address (or set FICHERO_ADDR env var)")
|
||||
parser.add_argument("--classic", action="store_true", default=_DEFAULT_CLASSIC,
|
||||
help="Default to Classic Bluetooth RFCOMM")
|
||||
parser.add_argument("--channel", type=int, default=_DEFAULT_CHANNEL,
|
||||
help="Default RFCOMM channel (default: 1)")
|
||||
parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Push CLI overrides into module-level defaults so all handlers pick them up
|
||||
global _DEFAULT_ADDRESS, _DEFAULT_CLASSIC, _DEFAULT_CHANNEL
|
||||
_DEFAULT_ADDRESS = args.address
|
||||
_DEFAULT_CLASSIC = args.classic
|
||||
_DEFAULT_CHANNEL = args.channel
|
||||
|
||||
uvicorn.run(
|
||||
"fichero.api:app",
|
||||
host=args.host,
|
||||
port=args.port,
|
||||
reload=args.reload,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -9,6 +9,13 @@ dependencies = [
|
||||
"pillow",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
api = [
|
||||
"fastapi>=0.111",
|
||||
"uvicorn[standard]>=0.29",
|
||||
"python-multipart>=0.0.9",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
@@ -18,3 +25,4 @@ packages = ["fichero"]
|
||||
|
||||
[project.scripts]
|
||||
fichero = "fichero.cli:main"
|
||||
fichero-server = "fichero.api:main"
|
||||
|
||||
Reference in New Issue
Block a user