Files
Fichero/fichero_printer/fichero/api.py

618 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""HTTP REST API for the Fichero D11s thermal label printer.
Start with:
fichero-server [--host HOST] [--port PORT]
or:
python -m fichero.api
Endpoints:
GET /status Printer status
GET /info Printer info (model, firmware, battery, …)
POST /print/text Print a text label
POST /print/image Print an uploaded image file
"""
from __future__ import annotations
import argparse
import asyncio
import io
import re
import os
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Annotated
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from PIL import Image
from bleak import BleakScanner
from fichero.cli import DOTS_PER_MM, do_print
from fichero.imaging import text_to_image
from fichero.printer import (
PAPER_GAP,
PrinterError,
PrinterNotFound,
PrinterTimeout,
connect,
)
# ---------------------------------------------------------------------------
# Global connection settings (env vars or CLI flags at startup)
# ---------------------------------------------------------------------------
_DEFAULT_ADDRESS: str | None = os.environ.get("FICHERO_ADDR")
# Default to BLE transport (most reliable for Fichero/D11s printers)
# Set FICHERO_TRANSPORT=classic to force Classic Bluetooth (RFCOMM)
_DEFAULT_CLASSIC: bool = os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic"
_DEFAULT_CHANNEL: int = int(os.environ.get("FICHERO_CHANNEL", "1"))
_PAPER_MAP = {"gap": 0, "black": 1, "continuous": 2}
def _parse_paper(value: str) -> int:
if value in _PAPER_MAP:
return _PAPER_MAP[value]
try:
val = int(value)
if 0 <= val <= 2:
return val
except ValueError:
pass
raise HTTPException(status_code=422, detail=f"Invalid paper type '{value}'. Use gap, black, continuous or 0-2.")
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI): # noqa: ARG001
yield
app = FastAPI(
title="Fichero Printer API",
description="REST API for the Fichero D11s (AiYin) thermal label printer.",
version = "0.1.36",
lifespan=lifespan,
docs_url=None,
redoc_url=None,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Serve static files for the modern web UI (if built and present in 'web' dir)
_WEB_ROOT = Path(__file__).parent / "web"
if _WEB_ROOT.exists():
# Typical SPA assets folder
if (_WEB_ROOT / "assets").exists():
app.mount("/assets", StaticFiles(directory=_WEB_ROOT / "assets"), name="assets")
def _address(address: str | None) -> str | None:
"""Return the effective BLE address (request value overrides env default)."""
return address or _DEFAULT_ADDRESS
def _ui_html() -> str:
default_address = _DEFAULT_ADDRESS or ""
default_transport = "classic" if _DEFAULT_CLASSIC else "ble"
try:
template_path = Path(__file__).parent / "index.html"
template = template_path.read_text(encoding="utf-8")
except FileNotFoundError:
return "<h1>Error: index.html not found</h1>"
# Simple substitution for initial values
template = (
template.replace("{default_address}", default_address)
.replace("{ble_selected}", " selected" if default_transport == "ble" else "")
.replace("{classic_selected}", " selected" if default_transport == "classic" else "")
.replace("{default_channel}", str(_DEFAULT_CHANNEL))
)
# Inject debug scan section and script
scan_html = """
<div class="card">
<h2>Debug Scan</h2>
<p class="muted">Scans for all nearby BLE devices to help with debugging connection issues.</p>
<button type="button" onclick="scanForDevices()" id="scan-button">
<span class="loading" id="scan-loading" style="display: none;"></span>
<span id="scan-text">Scan for BLE Devices (10s)</span>
</button>
<pre id="scan-results">📱 Click "Scan for BLE Devices" to search for nearby Bluetooth devices...</pre>
</div>
"""
scan_script = r'''
// Scan for BLE Devices function
async function scanForDevices() {
console.log('Scan function called - checking elements...');
const resultsEl = document.getElementById('scan-results');
const scanButton = document.getElementById('scan-button');
const loadingEl = document.getElementById('scan-loading');
const textEl = document.getElementById('scan-text');
console.log('Elements found:', { resultsEl, scanButton, loadingEl, textEl });
// Show loading state
scanButton.disabled = true;
loadingEl.style.display = 'inline-block';
textEl.textContent = 'Scanning...';
resultsEl.textContent = '🔍 Searching for BLE devices (this may take up to 10 seconds)...';
console.log('Starting scan request...');
try {
const response = await fetch('scan');
let responseData;
try {
responseData = await response.json();
} catch (e) {
const text = await response.text();
throw new Error(`Invalid JSON response: ${text}`);
}
if (!response.ok) {
const errorDetail = responseData.detail || `HTTP error! status: ${response.status}`;
throw new Error(errorDetail);
}
const devices = responseData;
if (devices.length === 0) {
resultsEl.textContent = '📡 No BLE devices found.\n\nTroubleshooting tips:\n- Make sure your printer is powered on\n- Ensure Bluetooth is enabled on this device\n- Bring the printer closer (within 5 meters)\n- Try restarting the printer';
} else {
let resultText = '🎉 Found ' + devices.length + ' device(s):\n\n';
devices.forEach((d, index) => {
resultText += `${index + 1}. ${d.name || 'Unknown Device'}\n`;
resultText += ` Address: ${d.address}\n`;
// Handle case where RSSI might not be available
if (d.rssi !== undefined) {
resultText += ` Signal: ${d.rssi} dBm (${Math.abs(d.rssi) < 60 ? 'Strong' : Math.abs(d.rssi) < 80 ? 'Good' : 'Weak'})\n`;
} else {
resultText += ` Signal: Not available\n`;
}
if (d.metadata) {
resultText += ` Metadata: ${d.metadata}\n`;
}
resultText += ` ${'='.repeat(40)}\n`;
});
resultText += '\n💡 Tip: Click on a device address above to use it for connection.';
resultsEl.textContent = resultText;
}
} catch (e) {
resultsEl.textContent = '❌ Error during scan: ' + e.message + '\n\nPossible causes:\n- Bluetooth adapter not available\n- Missing permissions\n- Bluetooth service not running';
console.error('Scan error:', e);
} finally {
// Reset button state
console.log('Resetting button state...');
scanButton.disabled = false;
loadingEl.style.display = 'none';
textEl.textContent = 'Scan for BLE Devices (10s)';
console.log('Scan completed. Final result:', resultsEl.textContent);
}
}
'''
# Inject scan HTML after main content
if "</main>" in template:
parts = template.split("</main>", 1)
template = parts[0] + "</main>" + scan_html + parts[1]
elif "</body>" in template:
parts = template.split("</body>", 1)
template = parts[0] + scan_html + "</body>" + parts[1]
else:
# Fallback if no main or body tag
template += scan_html
# Inject scan script before the closing </script> tag of the main script
if "</script>" in template:
parts = template.rsplit("</script>", 1)
template = parts[0] + scan_script + "</script>" + parts[1]
else:
# Fallback if no script tag found
template += f"<script>{scan_script}</script>"
return template
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/", include_in_schema=False, response_class=HTMLResponse)
async def root(legacy: bool = False):
"""Serve a compact printer UI for Home Assistant."""
# Prefer the modern SPA if available, unless ?legacy=true is used
if not legacy and (_WEB_ROOT / "index.html").exists():
return HTMLResponse((_WEB_ROOT / "index.html").read_text(encoding="utf-8"))
return HTMLResponse(_ui_html())
@app.get("/docs", include_in_schema=False)
async def docs():
"""Serve Swagger UI with ingress-safe relative OpenAPI URL."""
return get_swagger_ui_html(
openapi_url="openapi.json",
title=f"{app.title} - Swagger UI",
)
@app.get(
"/status",
summary="Get printer status",
response_description="Current printer status flags",
)
async def get_status(
address: str | None = None,
classic: bool = _DEFAULT_CLASSIC,
channel: int = _DEFAULT_CHANNEL,
):
"""Return the real-time status of the printer (paper, battery, heat, …)."""
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
status = await pc.get_status()
except PrinterNotFound as exc:
detail = str(exc)
if "BLE" in detail or "BLE" in str(classic):
detail += "\n\nBLE Troubleshooting:\n"
detail += "- Ensure Home Assistant has Bluetooth permissions (host_dbus: true)\n"
detail += "- Make sure the printer is powered on and discoverable\n"
detail += "- Try restarting the printer\n"
detail += "- Check that no other device is connected to the printer"
raise HTTPException(status_code=404, detail=detail) from exc
except PrinterTimeout as exc:
detail = str(exc)
if not classic: # BLE timeout
detail += "\n\nBLE Connection Tips:\n"
detail += "- Bring the printer closer to the Home Assistant host\n"
detail += "- Ensure no Bluetooth interference (WiFi, USB 3.0, microwaves)\n"
detail += "- Try restarting the Bluetooth service on your host"
raise HTTPException(status_code=504, detail=detail) from exc
except PrinterError as exc:
detail = str(exc)
error_str = str(exc).lower()
if not classic and "br-connection-not-supported" in error_str:
detail += "\n\n🔧 HOME ASSISTANT BLE PERMISSION FIX:\n"
detail += "This error occurs when Home Assistant doesn't have proper Bluetooth permissions.\n\n"
detail += "📋 STEP-BY-STEP SOLUTION:\n"
detail += "1. Edit your add-on configuration in Home Assistant\n"
detail += "2. Add this line to the configuration:\n"
detail += " host_dbus: true\n"
detail += "3. Save the configuration\n"
detail += "4. Restart the Fichero add-on\n"
detail += "5. Try connecting again\n\n"
detail += "💡 If you're using the Home Assistant OS:\n"
detail += "- Go to Settings > Add-ons > Fichero Printer > Configuration\n"
detail += "- Add 'host_dbus: true' under the 'host_dbus' section\n"
detail += "- This gives the add-on access to the system Bluetooth stack\n\n"
detail += "🔄 ALTERNATIVE SOLUTION:\n"
detail += "If BLE still doesn't work after adding host_dbus, try Classic Bluetooth:\n"
detail += "1. Set 'classic=true' in your API calls\n"
detail += "2. Use channel=1 (most common for Fichero printers)\n"
detail += "3. Use the 'Pair Device' button in the web UI first"
elif not classic and "dbus" in error_str:
detail += "\n\nBLE Permission Fix:\n"
detail += "1. Add 'host_dbus: true' to your add-on configuration\n"
detail += "2. Restart the add-on\n"
detail += "3. If using Classic Bluetooth, try: classic=true with channel=1"
elif not classic and "connection" in error_str:
detail += "\n\nBLE Connection Help:\n"
detail += "- Verify the BLE address is correct (not Classic Bluetooth address)\n"
detail += "- Ensure no other device is paired/connected to the printer\n"
detail += "- Try power cycling the printer"
raise HTTPException(status_code=502, detail=detail) from exc
return {
"ok": status.ok,
"printing": status.printing,
"cover_open": status.cover_open,
"no_paper": status.no_paper,
"low_battery": status.low_battery,
"overheated": status.overheated,
"charging": status.charging,
"raw": status.raw,
}
@app.get(
"/info",
summary="Get printer info",
response_description="Model, firmware, serial number and battery level",
)
async def get_info(
address: str | None = None,
classic: bool = _DEFAULT_CLASSIC,
channel: int = _DEFAULT_CHANNEL,
):
"""Return static and dynamic printer information."""
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
info = await pc.get_info()
info.update(await pc.get_all_info())
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return info
@app.get(
"/scan",
summary="Scan for BLE devices",
response_description="List of discovered BLE devices",
)
async def scan_devices():
"""Scan for nearby BLE devices for 10 seconds for debugging."""
try:
devices = await BleakScanner.discover(timeout=10.0)
result = []
for d in devices:
device_info = {
"address": d.address,
"name": d.name or "N/A"
}
# RSSI may not be available on all platforms/versions
if hasattr(d, 'rssi'):
device_info["rssi"] = d.rssi
if hasattr(d, 'metadata'):
device_info["metadata"] = str(d.metadata)
result.append(device_info)
return result
except Exception as exc:
# This provides more debug info to the user if scanning fails
raise HTTPException(
status_code=500, detail=f"An error occurred during BLE scanning: {exc}"
)
@app.post(
"/pair",
summary="Pair and trust a Classic Bluetooth device",
status_code=200,
description="⚠️ ONLY for Classic Bluetooth (RFCOMM). BLE does not require pairing!",
)
async def pair_device(
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
):
"""
Attempt to pair and trust the device using `bluetoothctl`.
⚠️ IMPORTANT: This is ONLY for Classic Bluetooth (RFCOMM) connections.
BLE connections do NOT require pairing and will NOT work with this endpoint.
For BLE issues, ensure:
- The printer is powered on and discoverable
- Home Assistant has proper Bluetooth permissions (host_dbus: true)
- You're using the correct BLE address (not Classic Bluetooth address)
"""
addr = _address(address)
if not addr:
raise HTTPException(status_code=422, detail="Address is required to pair.")
# Basic validation for MAC address format to mitigate injection risk.
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
cmd = f'echo -e "pair {addr}\\ntrust {addr}\\nquit" | bluetoothctl'
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
except FileNotFoundError:
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
output = stdout.decode(errors="ignore")
error = stderr.decode(errors="ignore")
if "Failed to pair" in output or "not available" in output.lower():
raise HTTPException(status_code=502, detail=f"Pairing failed. Output: {output}. Error: {error}")
return {"ok": True, "message": "Pair/trust command sent. Check output for details.", "output": output, "error": error}
@app.post(
"/unpair",
summary="Unpair a Bluetooth device",
status_code=200,
)
async def unpair_device(
address: Annotated[str | None, Form(description="Device address (optional, overrides FICHERO_ADDR)")] = None,
):
"""
Attempt to unpair the device using `bluetoothctl`.
"""
addr = _address(address)
if not addr:
raise HTTPException(status_code=422, detail="Address is required to unpair.")
# Basic validation for MAC address format to mitigate injection risk.
if not re.match(r"^[0-9A-F]{2}(:[0-9A-F]{2}){5}$", addr, re.IGNORECASE):
raise HTTPException(status_code=422, detail=f"Invalid address format: {addr}")
cmd = f'echo -e "remove {addr}\\nquit" | bluetoothctl'
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15.0)
except FileNotFoundError:
raise HTTPException(status_code=500, detail="`bluetoothctl` command not found. Is BlueZ installed and in PATH?")
except asyncio.TimeoutError:
raise HTTPException(status_code=504, detail="`bluetoothctl` command timed out after 15 seconds.")
output = stdout.decode(errors="ignore")
error = stderr.decode(errors="ignore")
if "Failed to remove" in output or "not available" in output.lower():
raise HTTPException(status_code=502, detail=f"Unpairing failed. Output: {output}. Error: {error}")
return {"ok": True, "message": "Unpair command sent. Check output for details.", "output": output, "error": error}
@app.post(
"/print/text",
summary="Print a text label",
status_code=200,
)
async def print_text(
text: Annotated[str, Form(description="Text to print on the label")],
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
font_size: Annotated[int, Form(description="Font size in points", ge=6, le=200)] = 30,
label_length: Annotated[int | None, Form(description="Label length in mm (overrides label_height)", ge=5, le=500)] = None,
label_height: Annotated[int, Form(description="Label height in pixels", ge=40, le=4000)] = 240,
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
):
"""Print a plain-text label.
The text is rendered as a 96 px wide, 1-bit image and sent to the printer.
"""
paper_val = _parse_paper(paper)
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
img = text_to_image(text, font_size=font_size, label_height=max_rows)
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
dither=False, max_rows=max_rows)
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
if not ok:
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
return {"ok": True, "copies": copies, "text": text}
@app.post(
"/print/image",
summary="Print an image",
status_code=200,
)
async def print_image(
file: Annotated[UploadFile, File(description="Image file to print (PNG, JPEG, BMP, …)")],
density: Annotated[int, Form(description="Print density: 0=light, 1=medium, 2=dark", ge=0, le=2)] = 2,
paper: Annotated[str, Form(description="Paper type: gap, black, or continuous")] = "gap",
copies: Annotated[int, Form(description="Number of copies", ge=1, le=99)] = 1,
dither: Annotated[bool, Form(description="Apply Floyd-Steinberg dithering")] = True,
label_length: Annotated[int | None, Form(description="Max label length in mm (overrides label_height)", ge=5, le=500)] = None,
label_height: Annotated[int, Form(description="Max label height in pixels", ge=40, le=4000)] = 240,
address: Annotated[str | None, Form(description="BLE address (optional, overrides FICHERO_ADDR)")] = None,
classic: Annotated[bool, Form(description="Use Classic Bluetooth RFCOMM")] = _DEFAULT_CLASSIC,
channel: Annotated[int, Form(description="RFCOMM channel")] = _DEFAULT_CHANNEL,
):
"""Print an image file.
The image is resized to 96 px wide, optionally dithered to 1-bit, and sent to the printer.
Supported formats: PNG, JPEG, BMP, GIF, TIFF, WEBP.
"""
# Validate content type loosely — Pillow will raise on unsupported data
data = await file.read()
if not data:
raise HTTPException(status_code=422, detail="Uploaded file is empty.")
try:
img = Image.open(io.BytesIO(data))
img.load() # ensure the image is fully decoded
except Exception as exc:
raise HTTPException(status_code=422, detail=f"Cannot decode image: {exc}") from exc
paper_val = _parse_paper(paper)
max_rows = (label_length * DOTS_PER_MM) if label_length is not None else label_height
try:
async with connect(_address(address), classic=classic, channel=channel) as pc:
ok = await do_print(pc, img, density=density, paper=paper_val, copies=copies,
dither=dither, max_rows=max_rows)
except PrinterNotFound as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except PrinterTimeout as exc:
raise HTTPException(status_code=504, detail=str(exc)) from exc
except PrinterError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
if not ok:
raise HTTPException(status_code=502, detail="Printer did not confirm completion.")
return {"ok": True, "copies": copies, "filename": file.filename}
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
"""Start the Fichero HTTP API server."""
global _DEFAULT_ADDRESS, _DEFAULT_CLASSIC, _DEFAULT_CHANNEL
try:
import uvicorn # noqa: PLC0415
except ImportError:
print("ERROR: uvicorn is required to run the API server.")
print("Install it with: pip install 'fichero-printer[api]'")
raise SystemExit(1) from None
parser = argparse.ArgumentParser(description="Fichero Printer API Server")
parser.add_argument("--host", default="127.0.0.1", help="Bind host (default: 127.0.0.1)")
parser.add_argument("--port", type=int, default=8765, help="Bind port (default: 8765)")
parser.add_argument("--address", default=_DEFAULT_ADDRESS, metavar="BLE_ADDR",
help="Default BLE address (or set FICHERO_ADDR env var)")
parser.add_argument("--classic", action="store_true", default=_DEFAULT_CLASSIC,
help="Default to Classic Bluetooth RFCOMM")
parser.add_argument("--channel", type=int, default=_DEFAULT_CHANNEL,
help="Default RFCOMM channel (default: 1)")
parser.add_argument("--reload", action="store_true", help="Enable auto-reload (development)")
args = parser.parse_args()
# Push CLI overrides into module-level defaults so all handlers pick them up
_DEFAULT_ADDRESS = args.address
_DEFAULT_CLASSIC = args.classic
_DEFAULT_CHANNEL = args.channel
# Pass the app object directly when not reloading so that the module-level
# globals (_DEFAULT_ADDRESS etc.) set above are visible to the handlers.
# The string form "fichero.api:app" is required for --reload only, because
# uvicorn's reloader needs to re-import the module in a worker process.
uvicorn.run(
"fichero.api:app" if args.reload else app,
host=args.host,
port=args.port,
reload=args.reload,
)
if __name__ == "__main__":
main()