Source code for nat20

"""
Primary interface to Pixels dice.

::

    async for scan in scan_for_dice():
        break

    async with scan.connect() as die:
        die.blink_id(0x80)
"""
import asyncio
from collections.abc import AsyncGenerator, AsyncIterable
import contextlib
import dataclasses
import datetime
import enum
import logging
import struct
from types import EllipsisType
from typing import Self

import aioevents
import bleak

from .constants import SERVICE_PIXELS, SERVICE_INFO
from .link import PixelLink
from .messages import (
    DesignAndColor, WhoAreYou, IAmADie, DieFlavor,
    RequestRollState, RollState, RollState_State,
    RequestBatteryLevel, BatteryLevel, BatteryState,
    Blink, BlinkAck, BlinkId, BlinkIdAck,
    StopAllAnimations,
    RequestMode, RequestRssi, Rssi,
    SetName, SetNameAck,
    NotifyUser, NotifyUserAck, OkCancel,
    Calibrate, CalibrateFace,
    RequestTemperature, Temperature,
)  # Also, import .messages so everything gets registered

LOG = logging.getLogger(__name__)

# Since these are protocol definitions, I would prefer to use explicit numbers
# in enums, but none of the first-party code does that.


[docs]class ScanBattState(enum.IntEnum): """ The charge state of the battery. """ #: The battery is discharging Ok = 0 #: The battery is charging Charging = 1
[docs] def as_batterystate(self) -> BatteryState: """ Repackage as a :class:`BatteryState`. Note that due to data fidelity, the state can only be :attr:`.BatteryState.Ok` or :attr:`.BatteryState.Charging`. """ match self: case ScanBattState.Ok: return BatteryState.Ok case ScanBattState.Charging: return BatteryState.Charging case _: # Can't happen, but covering bases return BatteryState.Error
[docs]@dataclasses.dataclass class ScanResult: _device: bleak.backends.device.BLEDevice #: The name of the die name: str #: The number of LEDs and faces led_count: int #: The aesthetic design of the die design_and_color: DesignAndColor #: The motion of the die roll_state: RollState_State #: The current face (starting at 0) roll_face: int #: The charge state of the battery batt_state: ScanBattState #: The level of the battery, as a percent batt_level: int #: The unique ID of the die pixel_id: int #: The build date of the firmware build_timestamp: datetime.datetime @property def flavor(self) -> DieFlavor: """ The kind of die this is, like D20 or Pipped D6 """ return DieFlavor._from_led_count(self.led_count) @property def face_count(self) -> int: """ The total number of faces """ return self.flavor.face_count @classmethod def _construct(cls, device, name, mdata, sdata): led_count, design, roll_state, face, batt = struct.unpack( "<BBBBB", mdata) id, build = struct.unpack("<II", sdata) build = datetime.datetime.fromtimestamp( build, tz=datetime.timezone.utc) return cls( _device=device, name=name, led_count=led_count, design_and_color=DesignAndColor(design), roll_state=RollState_State(roll_state), roll_face=face, batt_state=ScanBattState(batt >> 7), batt_level=batt & 0x7F, pixel_id=id, build_timestamp=build, )
[docs] def to_rollstate(self) -> 'RollState': """ Repackages the rolling information. """ return RollState( state=self.roll_state, face=self.roll_face, )
[docs] def to_batterylevel(self) -> 'BatteryLevel': """ Repackages the battery information. Note that due to data fidelity, the state can only be :attr:`.BatteryState.Ok` or :attr:`.BatteryState.Charging`. """ return BatteryLevel( state=self.batt_state.as_batterystate(), level=self.batt_level, )
[docs] def hydrate(self) -> 'Pixel': """ Constructs a full Pixel class for this die. """ return Pixel(self)
[docs]async def scan_for_dice() -> AsyncIterable[ScanResult]: """ Search for dice. Will scan forever as long as the iterator is live. For timeouts, :func:`asyncio.timeout` might be helpful. """ q = asyncio.Queue() def detected(device, ad_data): if ( 0xFFFF in ad_data.manufacturer_data and SERVICE_INFO in ad_data.service_data ): sr = ScanResult._construct( device, ad_data.local_name, ad_data.manufacturer_data[0xFFFF], ad_data.service_data[SERVICE_INFO], ) q.put_nowait(sr) scanner = bleak.BleakScanner( detection_callback=detected, service_uuids=[SERVICE_PIXELS], ) async with scanner: while True: yield await q.get()
[docs]class Pixel: """ Class for a pixel die. Do not construct directly, use :func:`scan_for_dice` to find the die you want and then use :meth:`ScanResult.connect` to get an instance. """ # The requirement to use scan_for_dice() is because while bleak does # support connecting by address, it internally just does a scan anyway, # so might as well make that part of the data model. And then users can # scan by name or ID or whatever. #: The textual name of the die. name: str #: Number of LEDs led_count: int #: The aesthetic design of the die design_and_color: DesignAndColor #: The factory-assigned die ID pixel_id: int #: Timestamp of when the firmware was built. build_timestamp: datetime.datetime #: Current roll state roll_state: RollState_State #: Current face that's up, starting at 0. Validity depends on :attr:`roll_state`. roll_face: int #: Current battery level as a percent batt_level: int #: Current battery percent batt_state: BatteryState _expected_disconnect: bool = False _link: PixelLink got_roll_state = aioevents.Event("(rs: RollState) A new RollState has been sent.") got_battery_level = aioevents.Event("(bl: BatteryLevel) A new BatteryState has been sent.") data_changed = aioevents.Event( "(cl: set[str]) Any of the props changed, giving the set of which ones" ) disconnected = aioevents.Event("() We've been unexpectedly disconnected from the die.") notify_user = aioevents.Event( "(cb: Callable[[OkCancel], None]) The die has something to tell the user." ) @property def flavor(self) -> DieFlavor: """ The kind of die this is, like D20 or Pipped D6 """ return DieFlavor._from_led_count(self.led_count) @property def face_count(self) -> int: """ The total number of faces """ return self.flavor.face_count
[docs] def to_rollstate(self) -> RollState: """ Repackages the rolling information. """ return RollState( state=self.roll_state, face=self.roll_face, )
[docs] def to_batterylevel(self) -> BatteryLevel: """ Repackages the battery information. Note that updated information hasn't been received, the state can only be :attr:`.BatteryState.Ok` or :attr:`.BatteryState.Charging`. """ return BatteryLevel( state=self.batt_state, level=self.batt_level, )
def __init__(self, sr: ScanResult): """ Use :meth:`ScanResult.connect` instead. :meta private: """ self.name = sr.name self.led_count = sr.led_count self.design_and_color = sr.design_and_color self.pixel_id = sr.pixel_id self.build_timestamp = sr.build_timestamp self.roll_state = sr.roll_state self.roll_face = sr.roll_face self.batt_level = sr.batt_level self.batt_state = sr.batt_state.as_batterystate() self._device = sr._device self._link = PixelLink(bleak.BleakClient( sr._device, services=[SERVICE_INFO, SERVICE_PIXELS], disconnected_callback=lambda c: asyncio.create_task( self._on_disconnect(c)), )) self._link._message_handlers[RollState].append(self._on_roll_state) self._link._message_handlers[BatteryLevel].append(self._on_battery_level) self._link._message_handlers[NotifyUser].append(self._on_notify_user)
[docs] async def connect(self): """ Connect to die """ self._expected_disconnect = False await self._link.connect()
[docs] async def disconnect(self): """ Disconnect from die """ self._expected_disconnect = True await self._link.disconnect()
[docs] @contextlib.asynccontextmanager async def connect_with_reconnect(self) -> AsyncGenerator[Self, None]: """ Connect to the die and make an effort to automatically reconnect. """ @self.disconnected.handler async def reconnect(_): await self.connect() # TODO: Forward error? await self.connect() try: yield self finally: self.disconnected.remove(reconnect) await self.disconnect()
async def _on_disconnect(self, client): if not self._expected_disconnect: LOG.info("Disconnected from %r", client) self.disconnected.trigger() def _on_roll_state(self, msg: RollState): self.roll_state = msg.state self.roll_face = msg.face self.data_changed.trigger({'roll_state', 'roll_face'}) self.got_roll_state.trigger(msg) def _on_battery_level(self, msg: BatteryLevel): self.batt_state = msg.state self.batt_level = msg.level self.data_changed.trigger({'batt_state', 'batt_level'}) self.got_battery_level.trigger(msg) def _on_notify_user(self, msg: NotifyUser): def respond(resp: OkCancel) -> asyncio.Future | asyncio.Task: return asyncio.create_task(self._link.send(NotifyUserAck(resp))) self.notify_user.trigger(msg.text, msg.ok, msg.cancel, msg.timeout, respond) def __repr__(self): return ( f"<{type(self).__name__} " f"{self.address} {self.name!r} is_connected={self.is_connected}" f">" ) @property def address(self) -> str: """ The MAC address (UUID on macOS) of the die. """ return self._link.address @property def is_connected(self) -> bool: """ Are we currently connected to the die? """ return self._link.is_connected
[docs] async def who_are_you(self) -> IAmADie: """ Perform a basic info query """ msg = await self._link.send_and_wait(WhoAreYou(), IAmADie) self.roll_state = msg.roll_state self.roll_face = msg.roll_face self.batt_state = msg.batt_state self.batt_level = msg.batt_level # These ones shouldn't change, but we're going to include them anyway. self.build_timestamp = msg.build_timestamp self.design_and_color = msg.design_and_color self.led_count = msg.led_count self.pixel_id = msg.pixel_id self.data_changed.trigger({ 'roll_state', 'roll_face', 'batt_state', 'batt_level', 'build_timestamp', 'design_and_color', 'pixel_id', }) return msg
async def what_do_you_want(self): """ Companion to :meth:`.who_are_you` :meta private: """ # Babylon 5, Vir Cotto to Mr Morden return ( "I'd like to live just long enough to be there when they cut off " "your head and stick it on a pike as a warning to the next ten " "generations that some favors come with too high a price. I would " "look up at your lifeless eyes and wave like this." )
[docs] async def get_roll_state(self) -> RollState: """ Request the current roll state. """ msg = await self._link.send_and_wait(RequestRollState(), RollState) self.roll_state = msg.state self.roll_face = msg.face self.data_changed.trigger({'roll_state', 'roll_face'}) return msg
[docs] async def get_battery_level(self) -> BatteryLevel: """ Request the current battery level. """ msg = await self._link.send_and_wait(RequestBatteryLevel(), BatteryLevel) self.batt_level = msg.level self.batt_state = msg.state self.data_changed.trigger({'batt_level', 'batt_state'}) return msg
[docs] async def stop_all_animations(self) -> None: """ Stops any animations currently running, regardless of source. """ # Nope, no acknowledgement packet await self._link.send(StopAllAnimations())
[docs] async def get_rssi(self) -> int: """ Request the RSSI the die sees. """ msg = await self._link.send_and_wait(RequestRssi( request_mode=RequestMode.Once, min_interval=0, ), Rssi) return msg.rssi
[docs] async def set_name(self, name: str) -> None: """ Change the name of the die. """ await self._link.send_and_wait(SetName(name=name), SetNameAck) self.name = name self.data_changed.trigger({'name'})
[docs] async def start_calibration(self) -> None: """ Start the calibration process. Note that this method only _starts_ it. Interaction will occur via the :attr:`notify_user` event. The die will not function normally until the calibration is completed or times out. """ await self._link.send(Calibrate())
[docs] async def calibrate_face(self, face: int) -> None: """ Calibrate a specific face. The die must be resting on a flat, level surface with the noted face up. """ await self._link.send(CalibrateFace(face))
[docs] async def get_temperature(self) -> tuple[float, float]: """ Get the current temperatue of the die microcontroller and battery, in degrees Celsius. """ temps = await self._link.send_and_wait(RequestTemperature(), Temperature) return temps.mcu_temp / 100, temps.batt_temp / 100