communication.pool module

This module contains the Pool class used to communicate between parties

class communication.pool.Pool(key=None, cert=None, ca_cert=None, timeout=ClientTimeout(total=300, connect=None, sock_read=None, sock_connect=None, ceil_threshold=5), max_retries=-1)[source]

Bases: object

Facilitates a communication pool that enables communication between us (server) and others (clients).

__init__(key=None, cert=None, ca_cert=None, timeout=ClientTimeout(total=300, connect=None, sock_read=None, sock_connect=None, ceil_threshold=5), max_retries=-1)[source]

Initialises a pool.

The key and certificate paths are provided as arguments to ssl.SSLContext.load_cert_chain. The ca_cert is provided as argument to ssl.SSLContext.load_verify_locations. Please refer to https://docs.python.org/3/library/ssl.html#certificates to learn more about the expected files and their format.

Parameters:
  • key (Path | str | None) – path to the key to use in the ssl context

  • cert (Path | str | None) – path to the certificate to use in the ssl context

  • ca_cert (Path | str | None) – path to the certificate authority (CA) certificate to use in the ssl context

  • timeout (ClientTimeout) – default timeout for client connections

  • max_retries (int) – default maximum number of retries for sending a message (-1 for unbounded retries)

add_http_client(name, addr, port=None, cert=None, **cert_kwargs)[source]

Add an HTTP Client to the pool. addr can be either an IP address or a hostname.

Parameters:
  • name (str) – name of the client

  • addr (str) – (ip) address of the client

  • port (int | None) – port of the client

  • cert (Path | str | None) – path to the certificate of the client to be used for identification

  • **cert_kwargs (Any) – argument to “type” for OpenSSL.crypto.load_certificate

Raises:

ImportError – required dependencies for client identification through certificates are missing

Return type:

None

add_http_server(port=None, addr='0.0.0.0', external_port=None)[source]

Add an HTTP Server to the pool.

Parameters:
  • addr (str) – (ip) address of the server

  • port (int | None) – the port to bind to. In case of port forwarding, this is the internal port

  • external_port (int | None) – optional external port that can be set in case of port forwarding. In that case, the external port only serves as identification of this sender to other parties. It should be equal to the port that is visible to other parties (i.e. the port that other parties will send their messages to).

Return type:

None

arecv(handler_name, msg_id=None)[source]

Receive a message synchronously from a peer.

Parameters:
  • handler_name (str) – the name of the pool handler to receive a message from

  • msg_id (str | None) – an optional string identifying the message to collect

Return type:

Future[Any]

Returns:

the message from peer or a Future.

arecv_all(handler_names=None, msg_id=None)[source]

Method that receives one message for each party in a synchronous fashion.

Parameters:
  • handler_names (Optional[Iterable[str]]) – List of pool handler names to receive a message from. If None, will receive one message from all parties.

  • msg_id (str | None) – an optional string identifying the message to collect

Return type:

tuple[tuple[str, Future[Any]], ...]

Returns:

Tuple of tuples containing first the party name and second the corresponding message or future.

asend(handler_name, message, msg_id=None, timeout=None, max_retries=None)[source]

Send a message to peer asynchronously. Schedules the sending of the message and returns immediately. There is no assurance of feedback about the message being delivered.

Parameters:
  • handler_name (str) – the name of the pool handler to send a message to

  • message (Any) – the message to send

  • msg_id (str | None) – an optional string identifying the message to send

  • timeout (ClientTimeout | None) – timeout for the connection, if not set use default_timeout

  • max_retries (int | None) – maximum number of retries for sending the message, if not set use default_max_retries (-1 for unbounded retries)

Return type:

None

async_broadcast(message, msg_id, handler_names=None, timeout=None, max_retries=None)[source]

Send a message to multiple other parties asynchronously. Serializes the message and schedules the sending of the message and returns immediately after that. There is no assurance of feedback about the message being delivered.

Parameters:
  • message (Any) – the message to send

  • msg_id (str) – a string identifying the message to send

  • handler_names (Optional[Iterable[str]]) – the names of the pool handlers to send a message to (if None, will broadcast to all known handlers)

  • timeout (ClientTimeout | None) – timeout for the connection, if not set use default_timeout

  • max_retries (int | None) – maximum number of retries for sending the message, if not set use default_max_retries (-1 for unbounded retries)

Return type:

None

async broadcast(message, msg_id, handler_names=None, timeout=None, max_retries=None)[source]

Send a message to multiple other parties synchronously

Parameters:
  • message (Any) – the message to send

  • msg_id (str) – a string identifying the message to send

  • handler_names (Optional[Iterable[str]]) – the names of the pool handlers to send a message to (if None, will broadcast to all known handlers)

  • timeout (ClientTimeout | None) – timeout for the connection, if not set use default_timeout

  • max_retries (int | None) – maximum number of retries for sending the message, if not set use default_max_retries (-1 for unbounded retries)

Return type:

None

static create_ssl_context(key, cert, ca_cert=None, server=False)[source]

Create an SSL context.

The key and certificate paths are provided as arguments to ssl.SSLContext.load_cert_chain. The ca_cert is provided as argument to ssl.SSLContext.load_verify_locations. Please refer to https://docs.python.org/3/library/ssl.html#certificates to learn more about the expected files and their format.

Parameters:
  • key (Path | str | None) – path to the key to use in the ssl context

  • cert (Path | str | None) – path to the certificate to use in the ssl context

  • ca_cert (Path | str | None) – path to the certificate authority (CA) certificate to use in the ssl context

  • server (bool) – boolean stating whether we need a server context or not (client)

Return type:

SSLContext | None

Returns:

an SSL context or None

static get_port(ssl_ctx)[source]

Returns a port number based on whether an ssl context is provided, or not.

Parameters:

ssl_ctx (SSLContext | None) – an ssl context

Return type:

int

Returns:

a port number

async recv(handler_name, msg_id=None, timeout=None)[source]

Receive a message asynchronously from a peer. Ensures result.

Parameters:
  • handler_name (str) – the name of the pool handler to receive a message from

  • msg_id (str | None) – an optional string identifying the message to collect

  • timeout (float | None) – maximum time in seconds to wait for the message to be received. Waits indefinitely if None.

Raises:

TimeoutError – message was not received before timeout

Return type:

Any

Returns:

the message from peer.

async recv_all(handler_names=None, msg_id=None, timeout=None)[source]

Method that receives one message for each party in an asynchronous fashion.

Parameters:
  • handler_names (Optional[Iterable[str]]) – List of pool handler names to receive a message from. If None, will receive one message from all parties.

  • msg_id (str | None) – an optional string identifying the message to collect

  • timeout (float | None) – maximum time in seconds to wait for the message to be received. Waits indefinitely if None.

Raises:

TimeoutError – message was not received before timeout

Return type:

list[tuple[str, Any]]

Returns:

Tuple of tuples containing first the party name and second the corresponding message.

async send(handler_name, message, msg_id=None, timeout=None, max_retries=None)[source]

Send a message to peer synchronously.

Parameters:
  • handler_name (str) – the name of the pool handler to send a message to

  • message (Any) – the message to send

  • msg_id (str | None) – an optional string identifying the message to send

  • timeout (ClientTimeout | None) – timeout for the connection, if not set use default_timeout

  • max_retries (int | None) – maximum number of retries for sending the message, if not set use default_max_retries (-1 for unbounded retries)

Return type:

None

async shutdown()[source]

Gracefully shutdown all connections/listeners in the pool.

Return type:

None

update_msg_prefix(msg_prefix)[source]

Updates the message prefix that is used for all clients in the pool. This message prefix is used for both sending and receiving of messages.

Parameters:

msg_prefix (str | None) – the new prefix to use

Return type:

None