communication.pool module

This module contains the Pool class. The Pool is an interface (and implementation) of a generic “communication pool”. A communication pool is intended to be a small peer-to-peer network that allows for point-to-point and broadcast communication.

class communication.pool.Pool(name, communicator, packer=None, *, timeout=3)[source]

Bases: AbstractAsyncContextManager[Pool]

Implements a generic communication pool intended for use by other modules in the PET Lab.

The Pool interface exposes a simple communication pool, i.e. a peer-to-peer network of nodes (referred to as ‘clients’) allowing for point-to-point and broadcast communication. The Pool interface can be used to easily send and receive messages to and from other clients in the pool.

The Pool provides the following features: * addressing: simply address a client by its string identifier * message buffering: read/receive the messages in the order of your choosing by receiving messages by id (pool.recv(sender, msg_id=’identifier’)) * benchmarking: automatically track your bandwidth usage

The Pool relies on two other interfaces: * The Communicator implements the actual “over-the-wire” communication. This allows you to easily change the network protocol/implementation of your Pool (e.g. Pool(communicator=MockCommunicator). * The Packer is used to serialize your python objects into bytes, allowing one to send arbitrary Python objects.

async __aenter__()[source]

Enter the runtime context for the Pool, running the initialize method.

Return type:

Pool

Returns:

The Pool object.

async __aexit__(exc_type, exc_val, exc_tb)[source]

Exit the runtime context for the Pool, running the shutdown method.

Parameters:
  • exc_type (type[BaseException] | None) – Exception type.

  • exc_val (BaseException | None) – Exception value.

  • exc_tb (TracebackType | None) – Exception traceback.

Return type:

bool

__init__(name, communicator, packer=None, *, timeout=3)[source]

Initialises a pool.

Parameters:
  • name (str) – The name under which this Pool is known to other Clients in the Pool.

  • communicator (Communicator[Any]) – A communicator object to use for communication.

  • packer (Packer | None) – Optional argument to specify a Packer to serialize the messages. Defaults to DefaultPacker if not provided.

  • timeout (float | None) – The default maximum time in seconds to wait for a message to be received. When the timeout passes, an error is thrown. If None, receive calls will wait indefinitely.

add_client(name, connection)[source]

Add a client to the Pool.

Parameters:
  • name (str) – The name of the client. You can pick any name as long as it is unique within the Pool. The name does not need to have any relation to the (network) configuration of the client, and acts purely as an identifier to reference the corresponding client.

  • connection (Any) – The connection object to use for communication with the client. This object is passed to the communicator to send and receive messages.

Raises:

KeyError – If no client with the given name is registered.

Return type:

None

async broadcast(message, msg_id, recipient_names=None)[source]

Broadcast a message to all clients in the Pool.

Unless specified otherwise through handler_names, the message is broadcast to all clients in the Pool.

Parameters:
  • message (Any) – The message to send.

  • msg_id (str) – String identifying the message to send.

  • recipient_names (Iterable[str] | None) – The names of the clients to send the message to. If None, will broadcast to all known clients.

Raises:

ValueError – Raised if not all clients in the pool have been configured with the same message prefix.

Return type:

None

property clients: set[str]

Return the names of all clients in the Pool.

async initialize()[source]

Initialize and start the communication pool.

This method calls Communicator.initialize which, depending on the implementation, performs the necessary steps to setup the connections between the clients.

Return type:

None

async recv(sender_name, msg_id=None, timeout=None)[source]

Receive a message from a client in the pool.

This method returns a single message, the message specified by the given msg_id.

If the specfied message has arrived, the message will be returned. If not, a Future will be returned that is resolved once the message arrives. This method is non-blocking.

Note: One should not reuse msg_id`s, as this could result in undefined behaviour. If you really want to reuse `msg_id`s, _you_ have to ensure that either 1) the order in which the messages are received does not matter, or 2) make sure that the messages with overlapping `msg_id are only sent once the previous must have been received.

Parameters:
  • sender_name (str) – The name of the client to receive a message from.

  • msg_id (str | None) – An optional string identifying the message to collect.

  • timeout (float | None) – Maximum time in seconds to wait for the message to be received. If None, will use the timeout of the Pool. Note that the default value of the Pool timeout is None, meaning the message is awaited indefinitely.

Raises:

CommunicationError – If no message was received before timeout.

Return type:

Any

Returns:

A received message or a future for the message.

async recv_all(sender_names=None, msg_id=None, timeout=None)[source]

Receive one message for each client in the Pool (or a subset if sender_names is provided).

See also Pool.recv().

Parameters:
  • sender_names (Iterable[str] | None) – List of client names to receive a message from. If None, it will receive one message from all parties.

  • msg_id (str | None) – An optional string identifying the message to collect.

  • timeout (float | None) – Maximum time in seconds to wait for the message to be received. If None, will use the timeout of the Pool. Note that the default value of the Pool timeout is None, meaning the message is awaited indefinitely.

Raises:

TimeoutError – Message was not received before timeout.

Return type:

list[tuple[str, Any]]

Returns:

Sequence of tuples containing first the client name and second the corresponding message or future.

async send(recipient_name, message, msg_id=None)[source]

Send a message to a single client in the Pool.

The recipient_name is the destination of the message and must match the name of one of the Client`s in the `Pool.

Parameters:
  • recipient_name (str) – The name of the pool handler to send a message to.

  • message (Any) – The message to send.

  • msg_id (str | None) – An optional string identifying the message to send.

Return type:

None

async shutdown()[source]

Gracefully shutdown all connections/listeners in the pool.

Return type:

None