Source code for communication.buffer
"""
This module contains a Buffer class for storing incoming or expected messages.
"""
from __future__ import annotations
from typing import Generic, TypeVar
T = TypeVar("T")
[docs]
class Buffer(Generic[T]):
"""
A class to store messages in buffers for clients. It stores elements of a type T.
"""
[docs]
def __init__(self) -> None:
"""
Initialise an empty buffer.
"""
self._buffer_per_client: dict[str, dict[str, T]] = {}
[docs]
def add_client(self, client_name: str) -> None:
"""
Add a new buffer for a client with name client_name.
:param client_name: The name of the client to add.
:raise ValueError: When a buffer already exists for the client.
"""
if client_name in self._buffer_per_client:
raise ValueError(
f"A buffer already exists for client with name '{client_name}'"
)
self._buffer_per_client[client_name] = {}
[docs]
def push(self, client_name: str, msg_id: str, content: T) -> None:
"""
Add a message for a client to the buffer.
:param client_name: the client under which to store the message.
:param msg_id: The message identifier.
:param content: The content of the message.
:raise KeyError: If no buffer exists for the given client.
:raise AttributeError: If a message with msg_id already exists in the buffer.
"""
buffer = self.get_client(client_name)
if msg_id in buffer:
raise AttributeError(
"A message with this id is already present in the buffer."
)
self._buffer_per_client[client_name][msg_id] = content
[docs]
def has_buffer(self, client_name: str) -> bool:
"""
Whether a buffer exists for the given client.
:param client_name: The name of the client to check for.
:return: Whether a buffer exists for the given client.
"""
return client_name in self._buffer_per_client
[docs]
def has_message(self, client_name: str, msg_id: str | None = None) -> bool:
"""
Whether a message with given id is present in the buffer for the given
client. If no message id is specified, it returns whether any message
is in the buffer for the given client.
:param client_name: The client to check for.
:param msg_id: The message id to check the existence for.
If None, it will check whether any message is available for the
given client.
:return: Boolean indicating whether the given / any message is
available for the client.
"""
if msg_id:
return msg_id in self._buffer_per_client[client_name]
return bool(self._buffer_per_client[client_name])
[docs]
def pop(self, client_name: str, msg_id: str) -> T:
"""
Pop a message with given id from the client's buffer.
:param client_name: The client for which the message must be popped.
:param msg_id: The identifier for the message.
:raise KeyError: If no buffer exists for the given client.
:raise KeyError: If the message with given ID does not exist.
:return: The content of the message.
"""
return self.get_client(client_name).pop(msg_id)
[docs]
def get_client(self, client_name: str) -> dict[str, T]:
"""
Return the buffer for the given client.
:param client_name: The name of the client for which to return the
buffer.
:raise KeyError: If the specified client does not have a buffer.
:return: Either the specified buffer or all the buffers.
"""
if not self.has_buffer(client_name):
raise KeyError("No buffer exists for the given client.")
return self._buffer_per_client[client_name]
[docs]
def empty(self, client_name: str | None = None) -> None:
"""
Clear the buffer for the given client. If no client is specified, clear
all buffers. If the client does not have a buffer, a new buffer will be
created.
:param client_name: If specified, clear the buffer for this client. If
not specified, clear all buffers.
"""
if client_name is not None:
self._buffer_per_client[client_name] = {}
return
for _client_name in self._buffer_per_client:
self._buffer_per_client[_client_name] = {}