Fix text/image alignment, add dithering, extend Classic BT to Windows
This commit is contained in:
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
281
tests/test_rfcomm.py
Normal file
281
tests/test_rfcomm.py
Normal file
@@ -0,0 +1,281 @@
|
||||
"""Tests for the RFCOMM (Classic Bluetooth) transport layer."""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from fichero.printer import (
|
||||
RFCOMM_CHANNEL,
|
||||
PrinterClient,
|
||||
PrinterError,
|
||||
RFCOMMClient,
|
||||
connect,
|
||||
)
|
||||
|
||||
|
||||
# --- RFCOMMClient unit tests ---
|
||||
|
||||
|
||||
class TestRFCOMMClientInit:
|
||||
def test_defaults(self):
|
||||
c = RFCOMMClient("AA:BB:CC:DD:EE:FF")
|
||||
assert c._address == "AA:BB:CC:DD:EE:FF"
|
||||
assert c._channel == RFCOMM_CHANNEL
|
||||
assert c._sock is None
|
||||
assert c._reader_task is None
|
||||
|
||||
def test_custom_channel(self):
|
||||
c = RFCOMMClient("AA:BB:CC:DD:EE:FF", channel=3)
|
||||
assert c._channel == 3
|
||||
|
||||
|
||||
class TestRFCOMMClientPlatformGuard:
|
||||
@pytest.mark.asyncio
|
||||
async def test_raises_on_unavailable_platform(self):
|
||||
with patch("fichero.printer._RFCOMM_AVAILABLE", False):
|
||||
client = RFCOMMClient("AA:BB:CC:DD:EE:FF")
|
||||
with pytest.raises(PrinterError, match="requires socket.AF_BLUETOOTH"):
|
||||
async with client:
|
||||
pass
|
||||
|
||||
|
||||
class TestRFCOMMClientConnect:
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_and_close(self):
|
||||
mock_sock = MagicMock()
|
||||
mock_sock.close = MagicMock()
|
||||
|
||||
with (
|
||||
patch("fichero.printer._RFCOMM_AVAILABLE", True),
|
||||
patch("fichero.printer.RFCOMMClient.__aenter__") as mock_enter,
|
||||
patch("fichero.printer.RFCOMMClient.__aexit__") as mock_exit,
|
||||
):
|
||||
client = RFCOMMClient("AA:BB:CC:DD:EE:FF")
|
||||
mock_enter.return_value = client
|
||||
mock_exit.return_value = None
|
||||
client._sock = mock_sock
|
||||
|
||||
async with client:
|
||||
assert client._sock is mock_sock
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_socket_closed_on_connect_failure(self):
|
||||
"""If sock_connect fails, the socket must be closed."""
|
||||
mock_sock = MagicMock()
|
||||
mock_sock.setblocking = MagicMock()
|
||||
mock_sock.close = MagicMock()
|
||||
|
||||
mock_socket_mod = MagicMock()
|
||||
mock_socket_mod.AF_BLUETOOTH = 31
|
||||
mock_socket_mod.SOCK_STREAM = 1
|
||||
mock_socket_mod.BTPROTO_RFCOMM = 3
|
||||
mock_socket_mod.socket.return_value = mock_sock
|
||||
|
||||
async def fail_connect(sock, addr):
|
||||
raise ConnectionRefusedError("refused")
|
||||
|
||||
with (
|
||||
patch("fichero.printer._RFCOMM_AVAILABLE", True),
|
||||
patch.dict("sys.modules", {"socket": mock_socket_mod}),
|
||||
):
|
||||
client = RFCOMMClient("AA:BB:CC:DD:EE:FF")
|
||||
loop = asyncio.get_running_loop()
|
||||
with patch.object(loop, "sock_connect", fail_connect):
|
||||
with pytest.raises(ConnectionRefusedError):
|
||||
await client.__aenter__()
|
||||
mock_sock.close.assert_called_once()
|
||||
|
||||
|
||||
class TestRFCOMMClientIO:
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_gatt_char_sends_data(self):
|
||||
client = RFCOMMClient("AA:BB:CC:DD:EE:FF")
|
||||
client._sock = MagicMock()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
with patch.object(loop, "sock_sendall", new_callable=AsyncMock) as mock_send:
|
||||
await client.write_gatt_char("ignored-uuid", b"\x10\xff\x40")
|
||||
mock_send.assert_called_once_with(client._sock, b"\x10\xff\x40")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_write_gatt_char_ignores_uuid_and_response(self):
|
||||
client = RFCOMMClient("AA:BB:CC:DD:EE:FF")
|
||||
client._sock = MagicMock()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
with patch.object(loop, "sock_sendall", new_callable=AsyncMock) as mock_send:
|
||||
await client.write_gatt_char("any-uuid", b"\xAB", response=True)
|
||||
mock_send.assert_called_once_with(client._sock, b"\xAB")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_notify_launches_reader(self):
|
||||
client = RFCOMMClient("AA:BB:CC:DD:EE:FF")
|
||||
client._sock = MagicMock()
|
||||
|
||||
callback = MagicMock()
|
||||
|
||||
# Mock sock_recv to return data once then empty (EOF)
|
||||
loop = asyncio.get_running_loop()
|
||||
call_count = 0
|
||||
|
||||
async def mock_recv(sock, size):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
return b"\x01\x02"
|
||||
return b""
|
||||
|
||||
with patch.object(loop, "sock_recv", mock_recv):
|
||||
await client.start_notify("ignored-uuid", callback)
|
||||
assert client._reader_task is not None
|
||||
# Let the reader loop run
|
||||
await client._reader_task
|
||||
|
||||
callback.assert_called_once_with(None, bytearray(b"\x01\x02"))
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reader_loop_handles_oserror(self):
|
||||
client = RFCOMMClient("AA:BB:CC:DD:EE:FF")
|
||||
client._sock = MagicMock()
|
||||
callback = MagicMock()
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
async def mock_recv(sock, size):
|
||||
raise OSError("socket closed")
|
||||
|
||||
with patch.object(loop, "sock_recv", mock_recv):
|
||||
await client.start_notify("uuid", callback)
|
||||
await client._reader_task
|
||||
|
||||
callback.assert_not_called()
|
||||
|
||||
|
||||
class TestRFCOMMClientExit:
|
||||
@pytest.mark.asyncio
|
||||
async def test_exit_cancels_reader_and_closes_socket(self):
|
||||
client = RFCOMMClient("AA:BB:CC:DD:EE:FF")
|
||||
mock_sock = MagicMock()
|
||||
client._sock = mock_sock
|
||||
|
||||
# Create a long-running task to cancel
|
||||
async def hang_forever():
|
||||
await asyncio.sleep(999)
|
||||
|
||||
client._reader_task = asyncio.create_task(hang_forever())
|
||||
|
||||
await client.__aexit__(None, None, None)
|
||||
|
||||
assert client._sock is None
|
||||
assert client._reader_task is None
|
||||
mock_sock.close.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_exit_no_reader_no_socket(self):
|
||||
"""Exit is safe even if never connected."""
|
||||
client = RFCOMMClient("AA:BB:CC:DD:EE:FF")
|
||||
await client.__aexit__(None, None, None)
|
||||
assert client._sock is None
|
||||
assert client._reader_task is None
|
||||
|
||||
|
||||
# --- connect() integration tests ---
|
||||
|
||||
|
||||
class TestConnectClassic:
|
||||
@pytest.mark.asyncio
|
||||
async def test_classic_requires_address(self):
|
||||
with pytest.raises(PrinterError, match="--address is required"):
|
||||
async with connect(classic=True):
|
||||
pass
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_classic_uses_rfcomm_client(self):
|
||||
mock_rfcomm = AsyncMock()
|
||||
mock_rfcomm.__aenter__ = AsyncMock(return_value=mock_rfcomm)
|
||||
mock_rfcomm.__aexit__ = AsyncMock(return_value=None)
|
||||
mock_rfcomm.start_notify = AsyncMock()
|
||||
|
||||
with patch("fichero.printer.RFCOMMClient", return_value=mock_rfcomm) as mock_cls:
|
||||
async with connect("AA:BB:CC:DD:EE:FF", classic=True, channel=3) as pc:
|
||||
assert isinstance(pc, PrinterClient)
|
||||
mock_cls.assert_called_once_with("AA:BB:CC:DD:EE:FF", 3)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ble_path_unchanged(self):
|
||||
"""classic=False still uses BleakClient."""
|
||||
mock_bleak = AsyncMock()
|
||||
mock_bleak.__aenter__ = AsyncMock(return_value=mock_bleak)
|
||||
mock_bleak.__aexit__ = AsyncMock(return_value=None)
|
||||
mock_bleak.start_notify = AsyncMock()
|
||||
|
||||
with patch("fichero.printer.BleakClient", return_value=mock_bleak) as mock_cls:
|
||||
async with connect("AA:BB:CC:DD:EE:FF", classic=False) as pc:
|
||||
assert isinstance(pc, PrinterClient)
|
||||
mock_cls.assert_called_once_with("AA:BB:CC:DD:EE:FF")
|
||||
|
||||
|
||||
# --- CLI arg parsing tests ---
|
||||
|
||||
|
||||
class TestCLIArgs:
|
||||
def test_classic_flag_default_false(self):
|
||||
from fichero.cli import main
|
||||
import argparse
|
||||
|
||||
with patch("argparse.ArgumentParser.parse_args") as mock_parse:
|
||||
mock_parse.return_value = argparse.Namespace(
|
||||
address=None, classic=False, channel=1,
|
||||
command="status", func=AsyncMock(),
|
||||
)
|
||||
# Just verify the parser accepts --classic
|
||||
from fichero.cli import main as cli_main
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--classic", action="store_true", default=False)
|
||||
parser.add_argument("--channel", type=int, default=1)
|
||||
args = parser.parse_args([])
|
||||
assert args.classic is False
|
||||
assert args.channel == 1
|
||||
|
||||
def test_classic_flag_set(self):
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--classic", action="store_true", default=False)
|
||||
parser.add_argument("--channel", type=int, default=1)
|
||||
args = parser.parse_args(["--classic", "--channel", "5"])
|
||||
assert args.classic is True
|
||||
assert args.channel == 5
|
||||
|
||||
def test_env_var_transport(self):
|
||||
import argparse
|
||||
|
||||
with patch.dict("os.environ", {"FICHERO_TRANSPORT": "classic"}):
|
||||
parser = argparse.ArgumentParser()
|
||||
import os
|
||||
parser.add_argument(
|
||||
"--classic", action="store_true",
|
||||
default=os.environ.get("FICHERO_TRANSPORT", "").lower() == "classic",
|
||||
)
|
||||
args = parser.parse_args([])
|
||||
assert args.classic is True
|
||||
|
||||
|
||||
# --- Exports ---
|
||||
|
||||
|
||||
class TestExports:
|
||||
def test_rfcomm_client_exported(self):
|
||||
from fichero import RFCOMMClient as RC
|
||||
assert RC is RFCOMMClient
|
||||
|
||||
def test_rfcomm_channel_exported(self):
|
||||
from fichero import RFCOMM_CHANNEL as CH
|
||||
assert CH == 1
|
||||
|
||||
def test_all_contains_new_symbols(self):
|
||||
import fichero
|
||||
assert "RFCOMMClient" in fichero.__all__
|
||||
assert "RFCOMM_CHANNEL" in fichero.__all__
|
||||
Reference in New Issue
Block a user