Source code for pyhap.util

import asyncio
import base64
import functools
import random
import socket
from uuid import UUID

import async_timeout
import orjson

from .const import BASE_UUID

ALPHANUM = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
HEX_DIGITS = "0123456789ABCDEF"

rand = random.SystemRandom()


[docs]def callback(func): """Decorator for non blocking functions.""" setattr(func, "_pyhap_callback", True) return func
[docs]def is_callback(func): """Check if function is callback.""" return "_pyhap_callback" in getattr(func, "__dict__", {})
[docs]def iscoro(func): """Check if the function is a coroutine or if the function is a ``functools.partial``, check the wrapped function for the same. """ if isinstance(func, functools.partial): func = func.func return asyncio.iscoroutinefunction(func)
[docs]def get_local_address() -> str: """ Grabs the local IP address using a socket. :return: Local IP Address in IPv4 format. :rtype: str """ # TODO: try not to talk 8888 for this s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect(("8.8.8.8", 80)) addr = s.getsockname()[0] finally: s.close() return str(addr)
[docs]def long_to_bytes(n): """ Convert a ``long int`` to ``bytes`` :param n: Long Integer :type n: int :return: ``long int`` in ``bytes`` format. :rtype: bytes """ byteList = [] x = 0 off = 0 while x != n: b = (n >> off) & 0xFF byteList.append(b) x = x | (b << off) off += 8 byteList.reverse() return bytes(byteList)
[docs]def generate_mac(): """ Generates a fake mac address used in broadcast. :return: MAC address in format XX:XX:XX:XX:XX:XX :rtype: str """ return "{}{}:{}{}:{}{}:{}{}:{}{}:{}{}".format( # pylint: disable=consider-using-f-string *(rand.choice(HEX_DIGITS) for _ in range(12)) )
[docs]def generate_setup_id(): """ Generates a random Setup ID for an ``Accessory`` or ``Bridge``. Used in QR codes and the setup hash. :return: 4 digit alphanumeric code. :rtype: str """ return "".join([rand.choice(ALPHANUM) for i in range(4)])
[docs]def generate_pincode(): """ Generates a random pincode. :return: pincode in format ``xxx-xx-xxx`` :rtype: bytearray """ return "{}{}{}-{}{}-{}{}{}".format( # pylint: disable=consider-using-f-string *(rand.randint(0, 9) for i in range(8)) ).encode("ascii")
[docs]def to_base64_str(bytes_input) -> str: return base64.b64encode(bytes_input).decode("utf-8")
[docs]def base64_to_bytes(str_input) -> bytes: return base64.b64decode(str_input.encode("utf-8"))
[docs]def byte_bool(boolv): return b"\x01" if boolv else b"\x00"
[docs]async def event_wait(event, timeout): """Wait for the given event to be set or for the timeout to expire. :param event: The event to wait for. :type event: asyncio.Event :param timeout: The timeout for which to wait, in seconds. :type timeout: float :return: ``event.is_set()`` :rtype: bool """ try: async with async_timeout.timeout(timeout): await event.wait() except asyncio.TimeoutError: pass return event.is_set()
[docs]@functools.lru_cache(maxsize=2048) def uuid_to_hap_type(uuid): """Convert a UUID to a HAP type.""" long_type = str(uuid).upper() if not long_type.endswith(BASE_UUID): return long_type return long_type.split("-", 1)[0].lstrip("0")
[docs]@functools.lru_cache(maxsize=2048) def hap_type_to_uuid(hap_type): """Convert a HAP type to a UUID.""" if "-" in hap_type: return UUID(hap_type) return UUID("0" * (8 - len(hap_type)) + hap_type + BASE_UUID)
[docs]def to_hap_json(dump_obj): """Convert an object to HAP json.""" return orjson.dumps(dump_obj) # pylint: disable=no-member
[docs]def to_sorted_hap_json(dump_obj): """Convert an object to sorted HAP json.""" return orjson.dumps( # pylint: disable=no-member dump_obj, option=orjson.OPT_SORT_KEYS # pylint: disable=no-member )
[docs]def from_hap_json(json_str): """Convert json to an object.""" return orjson.loads(json_str) # pylint: disable=no-member