Source code for templates.random_sources.process_source

"""
Object that provides randomness from processes.
"""

from __future__ import annotations

import multiprocessing as mp
import os
import sys
import time
from collections.abc import Iterable, Iterator
from concurrent.futures import Future, ProcessPoolExecutor, as_completed
from itertools import chain
from threading import Lock
from types import TracebackType
from typing import Any, Callable

from tno.mpc.encryption_schemes.templates._randomness_manager import (
    RR,
    PauseIteration,
    RandomnessSource,
)

if sys.version_info < (3, 11):
    from typing_extensions import Self
else:
    from typing import Self

PYTHON_GE_39 = sys.version_info >= (3, 9)
_DEFAULT_MAX_WORKERS = 1


[docs] class FakeList(list[Any]): """ List that completely discards anything you may want to add. """ def __init__(self, items: Iterable[Any] = (None,)) -> None: pass
[docs] def extend(self, values: Iterable[Any]) -> None: pass
# Inherit from protocol for earlier detection of erroneous type annotations.
[docs] class ProcessSource(RandomnessSource[RR]): """ Object for providing randomness from processes that repeatedly execute a randomness generating function. Implements tno.mpc.encryption_schemes.templates._randomness_manager.RandomnessSource. """
[docs] def __init__( self, generation_function: Callable[[], RR], amount: int = 0, max_workers: int | None = None, debug: bool = False, ) -> None: """ Object that starts processes to generate and yield random values. This construction starts generation workers that generate new randomness using the given generation function. This happens in separate processes to avoid blocking and speed up the generation. :param generation_function: Unbound callable object (e.g. function or static method) that generates one random value. :param amount: Upper bound on the total amount of randomizations to generate. :param max_workers: Number of workers that generate randomizations in parallel. Should be at least 1. If None, the number of workers equals the number of CPUs on the device. :param debug: Flag to determine whether debug information should be displayed. """ self._generation_function = generation_function self._nr_requested = amount self._nr_yielded = 0 max_workers = max_workers or _get_max_workers() if max_workers is not None and max_workers < 1: raise ValueError( "Requires at least one worker to generate randomness, but " f"{max_workers} workers were requested." ) self._max_workers = max_workers self._pool: ProcessPoolExecutor | None = None # keep track of futures in python<3.9 to cancel them manually later self._futures: list[Future[RR]] = FakeList() if PYTHON_GE_39 else [] self._randomness: Iterator[Future[RR]] = iter([]) self._lock = Lock() self._debug = debug
@property def nr_requested(self) -> int: """ Number of random elements requested. :return: Number of random elements requested. """ return self._nr_requested @property def nr_yielded(self) -> int: """ Number of random elements yielded. :return: Number of random elements yielded. """ return self._nr_yielded def __enter__(self) -> Self: self.open() return self def __exit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None, ) -> None: self.close() @property def pool(self) -> ProcessPoolExecutor: """ Get the pool of the ProcessSource. :raise ValueError: No pool has yet been initialized. :return: Pool of the current object. """ if self._pool is None: raise ValueError("No pool has yet been initialized") return self._pool
[docs] def open(self) -> None: """ Instantiate a pool for processes and set them to work. """ # "spawn" will be the default mp start method from Python 3.14 onwards # Before Python 3.14, on Linux, "fork" is the default. This causes issues, see # e.g. https://ci.tno.nl/gitlab/pet/lab/mpc/python-packages/microlibs/protocols/protocols/-/issues/11. # For more detailed information, see # https://web.archive.org/web/20250210143839/https://discuss.python.org/t/concerns-regarding-deprecation-of-fork-with-alive-threads/33555 ctx = mp.get_context("spawn") self._pool = ProcessPoolExecutor(max_workers=self._max_workers, mp_context=ctx) self._submit_to_pool(self.nr_requested)
[docs] def get_one(self) -> RR: """ Get one random value. :raise PauseIteration: All requested randomness was already yielded. :raise ValueError: No pool of processes instantiated. :return: Single random value. """ if self._pool is None: raise ValueError( "Attempted to read randomness from pool, but pool was not instantiated. Make " "sure to first call ProcessSource.open()." ) try: # We use a lock because the following line is not thread-safe. with self._lock: randomness = next(self._randomness).result() except StopIteration: raise PauseIteration( # pylint: disable=raise-missing-from "Process source is depleted. More randomness can be required through " "ProcessSource.increase_requested." ) self._nr_yielded += 1 return randomness
[docs] def close(self) -> None: """ Shuts down all processes. """ if PYTHON_GE_39: self.pool.shutdown(wait=False, cancel_futures=True) else: # For python<3.9, concurrent.futures.ProcessPoolExecutor.shutdown() does not # accept the cancel_futures but instead awaits completion of all scheduled Futures. # We instead store the futures and cancel them manually. Then we await the result # to prevent hanging (tests) after shutdown. # # The pool shutdown may hang if futures are created and cancelled immediately after. We # introduce a short delay to prevent the hang # (https://github.com/python/cpython/issues/94440). time.sleep(0.001) for fut in self._futures: fut.cancel() self.pool.shutdown(wait=True)
[docs] def increase_requested(self, amount: int) -> None: """ Increase the amount of randomness to be generated by the process pool. :param amount: Amount to be generated additionally. """ if self._pool is not None: self._submit_to_pool(amount) self._nr_requested += amount
def _submit_to_pool(self, amount: int) -> None: """ Require the pool to generate more randomness. :param amount: Amount of randomness to be generated additionally. """ futures = [self.pool.submit(self._generation_function) for _ in range(amount)] self._futures.extend(futures) self._randomness = chain(self._randomness, as_completed(futures)) def __str__(self) -> str: return ( f"{self.__class__.__name__}(nr_workers={self._max_workers}, " f"nr_requested={self.nr_requested})" )
def _get_max_workers() -> int | None: f""" Get the maximum number of workers using the environment variable `POOL_EXECUTOR_MAX_WORKERS`. :return: If `POOL_EXECUTOR_MAX_WORKERS == "auto"`, then return `None` so that `ProcessPoolExecutor` determines the maximum from the system properties. Alternatively, `POOL_EXECUTOR_MAX_WORKERS` is parsed into an integer and returned. If the variable is unset or does not parse, return {_DEFAULT_MAX_WORKERS}. """ m = os.getenv("POOL_EXECUTOR_MAX_WORKERS") if m is None: return _DEFAULT_MAX_WORKERS if m.lower() == "auto": return None try: return int(m) except ValueError: return _DEFAULT_MAX_WORKERS