Source code for communication.communicators.communicator
"""
This module contains the abstract classes for Communicators and Clients.
"""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from collections.abc import Awaitable
from enum import IntEnum, auto
from typing import Callable, Generic, TypeVar
logger = logging.getLogger(__name__)
# Types
Connection = TypeVar("Connection")
ReceiveHandlerType = Callable[[str, bytes], Awaitable[None]]
# State
[docs]
class CommunicatorState(IntEnum):
"""
The states in the lifecycle of a `Communicator.`
Upon construction, the Communicator is in the `UNINITIALIZED` state. To
start the Communicator, `Communicator.initialize` must be called,
transitioning the Communicator to the `STARTED` state. To stop the
Communicator, `Communicator.shutdown` must be called, transitioning the
Communicator to the `STOPPED` state.
"""
UNINITIALIZED = auto()
STARTED = auto()
STOPPED = auto()
[docs]
class IllegalStateAction(Exception):
"""
Exception class for when an action is performed that is illegal in the current state.
"""
def __init__(self, action: str, curr_state: CommunicatorState) -> None:
super().__init__(
f"Illegal: Cannot perform '{action}' while state is {curr_state.name}."
)
[docs]
class Communicator(Generic[Connection], ABC):
"""
An abstract interface for sending and receiving packets of bytes to
registered connections.
A Communicator has "connections" which describe how to reach a specific
endpoint. These connections are added through
`Communicator.add_connection`. Each connection has a unique name, which is
used to address the connection when sending packets to it, or to identify
the connection when receiving packets from it.
To be able to receive packets from connections through the Communicator,
one must register a `receive_handler` using
`Communicator.register_receive_handler`. This `receive_handler` is called
for each packet that is received.
"""
[docs]
def __init__(self) -> None:
"""
Initializer for the Communicator.
"""
self._receive_handler: ReceiveHandlerType | None = None
self._connections: dict[str, Connection] = {}
self._state = CommunicatorState.UNINITIALIZED
[docs]
def set_receive_handler(self, receive_handler: ReceiveHandlerType) -> None:
"""
Register the `receive_handler` to be called upon receiving a message.
:param receive_handler:
"""
self._receive_handler = receive_handler
@property
def receive_handler(self) -> ReceiveHandlerType:
"""
The handler to be called upon receiving a new packet.
:raise AttributeError: If no receive_handler is set.
:return: A handler function which accepts packets.
"""
if self._receive_handler is None:
raise AttributeError("The receive handler is not set.")
return self._receive_handler
[docs]
def add_connection(self, name: str, connection: Connection) -> None:
"""
Register a new connection to the communicator.
By registering a connection to the communicator, the communicator can
send and receive messages through the connection. Each connection needs
to be identified by a unique `name`.
:param name: The name of the connection to register.
:param connection: The connection to register.
:raise IllegalStateAction: If the state is not INITIALIZED
:raise KeyError: If a connection with the given name already exists.
"""
if not self._state == CommunicatorState.UNINITIALIZED:
raise IllegalStateAction("add_connection", self._state)
if name in self._connections:
raise KeyError(f"Connection with name '{name}' already registered!")
self._connections[name] = connection
[docs]
async def initialize(self) -> None:
"""
Initialize and start the Communicator.
Functionality depends on implementation, but often some (asynchronous)
setup steps are required to setup the connections.
:raise IllegalStateAction: If the state is not UNINITIALIZED.
"""
if not self._state == CommunicatorState.UNINITIALIZED:
raise IllegalStateAction("start", self._state)
self._state = CommunicatorState.STARTED
await self._initialize()
@abstractmethod
async def _initialize(self) -> None: ...
[docs]
async def send(self, recipient: str, packet: bytes) -> None:
"""
Send a message `packet` to the `recipient`.
:param recipient: The intended recipient of the message.
:param packet: The message to send to the recipient.
:raise IllegalStateAction: If the state is not STARTED.
:raise KeyError: If no connection is registered with the given name.
"""
if not self._state == CommunicatorState.STARTED:
raise IllegalStateAction("send", self._state)
if not recipient in self._connections:
raise KeyError(f"No connection registered with name '{recipient}'")
await self._send(self._connections[recipient], packet)
@abstractmethod
async def _send(self, recipient: Connection, packet: bytes) -> None: ...
[docs]
async def shutdown(self) -> None:
"""
Shutdown the communicator.
:raise IllegalStateAction: If the state is not STARTED
"""
if not self._state == CommunicatorState.STARTED:
raise IllegalStateAction("shutdown", self._state)
self._state = CommunicatorState.STOPPED
await self._shutdown()
@abstractmethod
async def _shutdown(self) -> None: ...