Source code for paillier.paillier

"""
Implementation of the Asymmetric Encryption Scheme known as Paillier.
"""

from __future__ import annotations

import hashlib
import numbers
import sys
import typing
import warnings
from dataclasses import asdict, dataclass
from functools import cached_property, lru_cache, partial
from secrets import randbelow
from typing import Any, TypedDict, Union, cast, get_args

from tno.mpc.encryption_schemes.templates import (
    AdditiveHomomorphicCiphertext,
    AdditiveHomomorphicEncryptionScheme,
    AsymmetricEncryptionScheme,
    EncodedPlaintext,
    EncryptionSchemeWarning,
    PublicKey,
    RandomizedEncryptionSchemeWarning,
    SecretKey,
    SerializationError,
)
from tno.mpc.encryption_schemes.templates.exceptions import (
    WARN_INEFFICIENT_HOM_OPERATION,
)
from tno.mpc.encryption_schemes.utils import FixedPoint, mod_inv, pow_mod, randprime

if sys.version_info < (3, 11):
    from typing_extensions import NotRequired
else:
    from typing import NotRequired

# Check to see if the communication module is available
try:
    from tno.mpc.communication import RepetitionError, Serializer
    from tno.mpc.communication.packers import DeserializerOpts, SerializerOpts

    COMMUNICATION_INSTALLED = True
except ModuleNotFoundError:
    COMMUNICATION_INSTALLED = False
except ImportError as exc:
    raise ImportError(
        "Detected an incompatible version of 'tno.mpc.communication'. Please install this package with the extra 'communication', e.g. 'tno.mpc.encryption_schemes.paillier[communication]'."
    )


fxp = FixedPoint.fxp


WARN_UNFRESH_SERIALIZATION = (
    "Serializer identified and rerandomized a non-fresh ciphertext."
)


[docs] @dataclass(frozen=True, eq=True) class PaillierPublicKey(PublicKey): r""" PublicKey for the Paillier encryption scheme. Constructs a new Paillier public key $(n, g)$, should have $n=pq$, with $p, q$ prime, and $g \in \mathbb{Z}^*_{n^2}$. :param n: Modulus $n$ of the plaintext space. :param g: Plaintext base $g$ for encryption. """ n: int g: int @cached_property def n_squared(self) -> int: """ Modulus of the ciphertext space. """ return self.n**2
[docs] @lru_cache def id(self) -> int: """ Identifier of this specific key that is consistent over system architectures. :return: Representation of SHA-256 hash of object-defining attribute values. """ # We use hashlib to ensure consistent hashes over different system architectures. h = hashlib.sha256() h.update(_to_bytes(self.n)) h.update(_to_bytes(self.g)) return int.from_bytes(h.digest(), "big")
# region Serialization logic
[docs] def serialize(self, _opts: SerializerOpts) -> dict[str, Any]: r""" Serialization function for public keys, which will be passed to the communication module. :raise SerializationError: When communication library is not installed. :return: Serialized version of this PaillierPublicKey. """ if not COMMUNICATION_INSTALLED: raise SerializationError() return asdict(self)
[docs] @staticmethod def deserialize(obj: dict[str, Any], _opts: DeserializerOpts) -> PaillierPublicKey: r""" Deserialization function for public keys, which will be passed to the communication module. :param obj: Serialized version of a PaillierPublicKey. :raise SerializationError: When communication library is not installed. :return: Deserialized PaillierPublicKey from the given dict. """ if not COMMUNICATION_INSTALLED: raise SerializationError() return PaillierPublicKey(**obj)
# endregion
[docs] @dataclass(frozen=True, eq=True) class PaillierSecretKey(SecretKey): r""" SecretKey for the Paillier encryption scheme. Constructs a new Paillier secret key $(\lambda, \mu)$, also contains $n$. Should have $n=pq$, with $p, q$ prime, $\lambda = \text{lcm}(p-1, q-1)$, and $\mu = (L(g^\lambda \mod n^2))^{-1} \mod n$, where $L(\cdot)$ is defined as $L(x) = (x-1)/n$. :param lambda_: Decryption exponent $\lambda$ of the ciphertext. :param mu: Decryption divisor $\mu$ for the ciphertext. :param n: Modulus $n$ of the plaintext space. """ lambda_: int mu: int n: int # region Serialization logic
[docs] def serialize(self, _opts: SerializerOpts) -> dict[str, Any]: r""" Serialization function for secret keys, which will be passed to the communication module. :raise SerializationError: When communication library is not installed. :return: Serialized version of this PaillierSecretKey. """ if not COMMUNICATION_INSTALLED: raise SerializationError() return asdict(self)
[docs] @staticmethod def deserialize(obj: dict[str, Any], _opts: DeserializerOpts) -> PaillierSecretKey: r""" Deserialization function for public keys, which will be passed to the communication module :param obj: Serialized version of a PaillierSecretKey. :raise SerializationError: When communication library is not installed. :return: Deserialized PaillierSecretKey from the given dict. """ if not COMMUNICATION_INSTALLED: raise SerializationError() return PaillierSecretKey(**obj)
# endregion Plaintext = Union[numbers.Integral, float, FixedPoint]
[docs] class PaillierCiphertext(AdditiveHomomorphicCiphertext[Plaintext, int, int]): """ Ciphertext for the Paillier asymmetric encryption scheme. This ciphertext is rerandomizable and supports homomorphic operations. """ scheme: Paillier
[docs] def __init__( self: PaillierCiphertext, raw_value: int, scheme: Paillier, *, fresh: bool = False, ): r""" Construct a RandomizableCiphertext, with the given value for the given EncryptionScheme. :param raw_value: PaillierCiphertext value $c \in \mathbb{Z}_{n^2}$. :param scheme: Paillier scheme that is used to encrypt this ciphertext. :param fresh: Indicates whether fresh randomness is already applied to the raw_value. :raise TypeError: When the given scheme is not a Paillier scheme. """ if not isinstance(scheme, Paillier): raise TypeError(f"expected Paillier scheme, got {type(scheme)}") super().__init__(raw_value, scheme, fresh=fresh)
[docs] def apply_randomness(self: PaillierCiphertext, randomization_value: int) -> None: """ Rerandomize this ciphertext using the given random value. :param randomization_value: Random value used for rerandomization. """ modulus = self.scheme.public_key.n_squared self._raw_value *= randomization_value self._raw_value %= modulus
[docs] def __eq__(self, other: object) -> bool: """ Compare this PaillierCiphertext with another to determine (in)equality. :param other: Object to compare this PaillierCiphertext with. :raise TypeError: When other object is not a PaillierCiphertext. :return: Boolean value representing (in)equality of both objects. """ if not isinstance(other, PaillierCiphertext): return NotImplemented return self._raw_value == other._raw_value and self.scheme == other.scheme
[docs] def copy(self: PaillierCiphertext) -> PaillierCiphertext: """ Create a copy of this Ciphertext, with the same value and scheme. The copy is not randomized and is considered not fresh. :return: Copied PaillierCiphertext. """ return PaillierCiphertext(raw_value=self._raw_value, scheme=self.scheme)
# region Serialization logic
[docs] class SerializedPaillierCiphertext(TypedDict): value: int scheme_id: int
[docs] def serialize( self, _opts: SerializerOpts ) -> PaillierCiphertext.SerializedPaillierCiphertext: r""" Serialization function for Paillier ciphertexts, which will be passed to the communication module. If the ciphertext is not fresh, it is randomized before serialization. After serialization, it is always marked as not fresh for security reasons. :raise SerializationError: When communication library is not installed. :return: Serialized version of this PaillierCiphertext. """ if not COMMUNICATION_INSTALLED: raise SerializationError() if not self.fresh: warnings.warn( WARN_UNFRESH_SERIALIZATION, RandomizedEncryptionSchemeWarning, stacklevel=2, ) self.randomize() self._fresh = False return { "value": self._raw_value, "scheme_id": self.scheme.identifier, }
[docs] @staticmethod def deserialize( obj: PaillierCiphertext.SerializedPaillierCiphertext, _opts: DeserializerOpts, ) -> PaillierCiphertext: r""" Deserialization function for Paillier ciphertexts, which will be passed to the communication module. :param obj: Serialized version of a PaillierCiphertext. :raise SerializationError: When communication library is not installed. :return: Deserialized PaillierCiphertext from the given dict. """ if not COMMUNICATION_INSTALLED: raise SerializationError() try: scheme = Paillier.from_id(obj["scheme_id"]) except KeyError as exc: raise SerializationError( "The scheme that is associated with the provided serialized PaillierCiphertext is not known. Please ensure that the corresponding scheme is deserialized first." ) from exc return PaillierCiphertext( raw_value=obj["value"], scheme=scheme, )
# endregion
[docs] class Paillier( AsymmetricEncryptionScheme[ PaillierPublicKey, PaillierSecretKey, Plaintext, int, PaillierCiphertext, ], AdditiveHomomorphicEncryptionScheme[ tuple[PaillierPublicKey, PaillierSecretKey], Plaintext, int, PaillierCiphertext, int, ], ): """ Paillier Encryption Scheme. This is an AsymmetricEncryptionScheme, with a public and secret key. This is also a AdditiveHomomorphicEncryptionScheme, thus having internal randomness generation, allowing for the use of precomputed randomness, and support for addition of ciphertexts. """ public_key: PaillierPublicKey secret_key: PaillierSecretKey
[docs] def __init__( self, public_key: PaillierPublicKey, secret_key: PaillierSecretKey | None, precision: int = 0, share_secret_key: bool = False, debug: bool = False, ): """ Construct a new Paillier encryption scheme, with the given keypair, randomness object, precision for fixed point encryption. :param public_key: Public key for this Paillier Scheme. :param secret_key: Secret Key for this Paillier Scheme. :param precision: Fixed point precision of this encoding, in decimal places. :param share_secret_key: Boolean value stating whether or not the secret key should be included in serialization. This should only be set to True if one is really sure of it. :param debug: flag to determine whether debug information should be displayed. """ self._generate_randomness = partial( # type: ignore[method-assign] self._generate_randomness_from_args, public_n=public_key.n, public_n_squared=public_key.n_squared, ) AsymmetricEncryptionScheme.__init__( self, public_key=public_key, secret_key=secret_key ) AdditiveHomomorphicEncryptionScheme.__init__( self, debug=debug, ) self.precision = precision self.max_value = FixedPoint(public_key.n // 2, precision=precision) self.min_value = -self.max_value # Variable that determines whether a secret key is sent when the scheme is sent # over a communication channel self.share_secret_key = share_secret_key if self.identifier not in self._instances: self.save_globally()
[docs] @staticmethod def generate_key_material( key_length: int, ) -> tuple[ PaillierPublicKey, PaillierSecretKey ]: # pylint: disable=arguments-differ r""" Method to generate key material (PaillierPublicKey and PaillierPrivateKey). :param key_length: Bit length of the public key $n$. :return: Tuple with first the Public Key and then the Secret Key. """ p = 1 q = 1 n = p * q while n.bit_length() != key_length: p = randprime(2 ** (key_length // 2 - 1), 2 ** (key_length // 2)) q = randprime(2 ** (key_length // 2 - 1), 2 ** (key_length // 2)) while p == q: q = randprime(2 ** (key_length // 2 - 1), 2 ** (key_length // 2)) n = p * q lambda_ = (p - 1) * (q - 1) g = n + 1 mu = mod_inv(lambda_, n) # use g = n + 1 # mu = mod_inv(Paillier.func_l(pow(g, lambda_, n**2), n), n) # use random g return PaillierPublicKey(n, g), PaillierSecretKey(lambda_, mu, n)
[docs] def encode(self, plaintext: Plaintext) -> EncodedPlaintext[int]: """ Encode a float or int with the given precision of this instantiation. Allows for positive and negative numbers. :param plaintext: Plaintext to be encoded. :raise ValueError: If the plaintext is outside the supported range of this Paillier instance. :return: EncodedPlaintext object containing the encoded value. """ if not self.min_value <= plaintext <= self.max_value: raise ValueError( f"This encoding scheme only supports values in the range [{self.min_value};" f"{self.max_value}], {plaintext} is outside that range." ) plaintext_fxp = fxp(plaintext, self.precision) return EncodedPlaintext(plaintext_fxp.value, self)
[docs] def decode(self, encoded_plaintext: EncodedPlaintext[int]) -> Plaintext: """ Decode an EncodedPlaintext given the specified precision of this instantiation. :param encoded_plaintext: Plaintext to be decoded. :return: decoded Plaintext value """ value = ( encoded_plaintext.value if 2 * encoded_plaintext.value <= self.public_key.n else encoded_plaintext.value - self.public_key.n ) return FixedPoint(value, self.precision)
def _unsafe_encrypt_raw( self, plaintext: EncodedPlaintext[int], ) -> PaillierCiphertext: r""" Encrypts an encoded (raw) plaintext value, but does not apply randomization. Given a raw plaintext message $m \in \mathbb{Z}_n$, we compute the ciphertext value as $c = g^m \mod n^2$. :param plaintext: EncodedPlaintext object containing the raw value $m \in \mathbb{Z}_n$ to be encrypted. :return: Non-randomized PaillierCiphertext object containing the encrypted plaintext $c$. """ return PaillierCiphertext( 1 + plaintext.value * self.public_key.n, self ) # use g = n + 1 def _decrypt_raw(self, ciphertext: PaillierCiphertext) -> EncodedPlaintext[int]: r""" Decrypts an ciphertext to its encoded plaintext value. Given a ciphertext $c \in \mathbb{Z}^*_{n^2}$, we compute the raw plaintext message as $m = L(c^\lambda \mod n^2) \cdot \mu \mod n. :param ciphertext: PaillierCiphertext object containing the ciphertext $c$ to be decrypted. :return: EncodedPlaintext object containing the encoded decryption $m$ of the ciphertext. """ c = ciphertext.peek_value() c_lambda = pow_mod(c, self.secret_key.lambda_, self.public_key.n_squared) m = Paillier.func_l(c_lambda, self.secret_key.n) m *= self.secret_key.mu m %= self.secret_key.n return EncodedPlaintext(m, self)
[docs] def neg(self, ciphertext: PaillierCiphertext) -> PaillierCiphertext: r""" Negate the underlying plaintext of this ciphertext. If the original plaintext of this ciphertext was 5. this method returns the ciphertext that has -5 as underlying plaintext. Given a ciphertext $c$ we compute the negated ciphertext $c'$ such that $c \cdot c' = 1 \mod n^2$. The resulting ciphertext is fresh only if the input was fresh. The input is marked as non-fresh after the operation. :param ciphertext: PaillierCiphertext $c$ of which the underlying plaintext should be negated. :return: PaillierCiphertext $c'$ corresponding to the negated plaintext. """ if new_ciphertext_fresh := ciphertext.fresh: warnings.warn( WARN_INEFFICIENT_HOM_OPERATION, RandomizedEncryptionSchemeWarning, stacklevel=2, ) # ciphertext.get_value() automatically marks ciphertext as not fresh return PaillierCiphertext( mod_inv(ciphertext.get_value(), self.public_key.n_squared), self, fresh=new_ciphertext_fresh, )
[docs] def add( self, ciphertext: PaillierCiphertext, other: PaillierCiphertext | Plaintext, ) -> PaillierCiphertext: r""" Secure addition. If other is another PaillierCiphertext $c_2$, add the underlying plaintext value of ciphertext $c_1$ with the underlying plaintext value of other. If it is a Plaintext, we add the plaintext value $m_2$ to ciphertext, by first encryption it and obtaining $c_2 = Enc(m_2)$. We then compute the result as $c' = c_1 \cdot c_2 \mod n^2$. The resulting ciphertext is fresh only if at least one of the inputs was fresh. Both inputs are marked as non-fresh after the operation. :param ciphertext: First PaillierCiphertext $c_1$ of which the underlying plaintext is added. :param other: Either a second PaillierCiphertext $c_2$ of which the underlying plaintext is added to the first. Or a plaintext $m_2$ that is added to the underlying plaintext of the first. :raise TypeError: When other is a ciphertext with a different scheme than ciphertext. :return: A PaillierCiphertext $c'$ containing the encryption of the addition of both values. """ if isinstance(other, get_args(Plaintext)): other = self.unsafe_encrypt(cast(Plaintext, other)) elif ciphertext.scheme != cast(PaillierCiphertext, other).scheme: raise TypeError( "The scheme of your first ciphertext is not equal to the scheme of your second " "ciphertext." ) other = cast(PaillierCiphertext, other) if new_ciphertext_fresh := ciphertext.fresh or other.fresh: warnings.warn( WARN_INEFFICIENT_HOM_OPERATION, RandomizedEncryptionSchemeWarning, stacklevel=2, ) # ciphertext.get_value() automatically marks ciphertext as not fresh return PaillierCiphertext( ciphertext.get_value() * other.get_value() % self.public_key.n_squared, self, fresh=new_ciphertext_fresh, )
[docs] def mul( self, ciphertext: PaillierCiphertext, other: PaillierCiphertext | Plaintext ) -> PaillierCiphertext: """ Multiply the underlying plaintext value of ciph $c$ with the given scalar $s$. We obtain the result by computing $c' = c^s$. The resulting ciphertext is fresh only if the input was fresh. The input is marked as non-fresh after the operation. :param ciphertext: PaillierCiphertext $c$ of which the underlying plaintext is multiplied. :param other: A scalar $s$ with which the plaintext underlying ciph should be multiplied. :raise NotImplementedError: When other is not an integer. :return: PaillierCiphertext $c'$ containing the encryption of the product of both values. """ # This check is necessary to support both built-in integers and gmpy2 integers # - The mpz class from gmpy2 is registered as a virtual subclass of numbers.Integral. Static type checkers # do not understand such dynamic typing constructions, so the stubs define mpz as a subclass of int, but # the runtime check is done on `numbers.Integral`. if not isinstance(other, numbers.Integral): raise NotImplementedError( f"Type of scalar (second multiplicand) should be an integer and not" f" {type(other)}." ) if (other := cast(int, other)) < 0: ciphertext = self.neg(ciphertext) other = -other if new_ciphertext_fresh := ciphertext.fresh: warnings.warn( WARN_INEFFICIENT_HOM_OPERATION, RandomizedEncryptionSchemeWarning, stacklevel=2, ) # ciphertext.get_value() automatically marks ciphertext as not fresh return PaillierCiphertext( pow_mod(ciphertext.get_value(), other, self.public_key.n_squared), self, fresh=new_ciphertext_fresh, )
[docs] def __eq__(self, other: object) -> bool: """ Compare this Paillier scheme with another to determine (in)equality. Does not take the secret key into account as it might not be known and the public key combined with the precision should be sufficient to determine equality. :param other: Object to compare this Paillier scheme with. :return: Boolean value representing (in)equality of both objects. """ if not isinstance(other, Paillier): return NotImplemented return self.identifier == other.identifier
[docs] def __hash__(self) -> int: """ Hash this Paillier scheme. :return: Hash of this Paillier scheme. """ return hash((self.public_key, self.precision))
@staticmethod def _generate_randomness_from_args( public_n: int, public_n_squared: int | None = None ) -> int: r""" Method to generate randomness value $r^n \mod n^2$, from a random number $r \in_R \mathbb{Z}^*_{n}$ for Paillier. :param public_n: Modulus of the message space. :param public_n_squared: Square of public_n. Can be passed for efficiency reasons. :return: A random number. """ if not public_n_squared: public_n_squared = public_n**2 random_element = randbelow(public_n - 1) + 1 return pow_mod(random_element, public_n, public_n_squared)
[docs] def random_plaintext( self, lower_bound: Plaintext | None = None, upper_bound: Plaintext | None = None, ) -> FixedPoint: """ Generate a uniformly random plaintext from the given interval. :param lower_bound: Lower bound (inclusive), when no lower bound is given, the lowest value of the plaintext space is used. :param upper_bound: Upper bound (exclusive), when no lower bound is given, the first value that is higher than the maximum value of the plaintext space is used. :raise Warning: When the precision of `lower_bound` or `upper_bound` is larger than `self.precision`. :raise ValueError: When an interval larger than plaintext space or an empty interval is specified. :return: A uniformly random element from specified range represented as a fixed point number If range unspecified, yields a uniformly random fixed point number from plaintext space. """ if lower_bound is None: lower_bound = self.min_value if upper_bound is None: upper_bound = self.max_value + 10 ** (-self.precision) # scale upper bounds to the right precision lower_bound_scaled = fxp(lower_bound, target_precision=self.precision) upper_bound_scaled = fxp(upper_bound, target_precision=self.precision) # perform some checks on the bounds if lower_bound_scaled >= upper_bound_scaled: raise ValueError( f"The entered interval [{lower_bound_scaled}, {upper_bound_scaled}) is empty when trimmed to " f"{self.precision} decimals. Please enter a non-empty interval." ) if ( lower_bound_scaled < self.min_value or upper_bound_scaled > self.max_value + 10 ** (-self.precision) ): raise ValueError( f"This encoding scheme only supports values in the range [{self.min_value};" f"{self.max_value}]. Part of [{lower_bound_scaled}; {upper_bound_scaled}) is outside that range." ) if fxp(lower_bound).precision > self.precision: warnings.warn( EncryptionSchemeWarning( "The lower bound has more decimals than the precision of the scheme." ) ) if fxp(upper_bound).precision > self.precision: warnings.warn( EncryptionSchemeWarning( "The upper bound has more decimals than the precision of the scheme." ) ) return FixedPoint.random_range(lower_bound_scaled, upper_bound_scaled)
[docs] def sample_mask( self, lower_bound: Plaintext, upper_bound: Plaintext, security_level: int | None = None, ) -> FixedPoint: r""" Returns a random value to mask a plaintext message from a given message space of size $|M|$ with statistical security, given security parameter $\sigma$. The result will be a random mask from an interval with size $|M| \cdot 2^{\sigma}$ that is centered around 0. To read the mathematics and reasoning behind this, please take a look at the README. :param lower_bound: Integral lower bound for message space (inclusive). :param upper_bound: Integral upper bound for message space (exclusive). :param security_level: Security level $\sigma$ we require from additive masking, if `None` a mask with perfect security is returned. The security level should be a non-negative integer, denoting the number of bits of security. :raise Warning: When the precision of `lower_bound` or `upper_bound` is larger than `self.precision`. A warning is also returned when the chosen security level equals 0 (since that provides NO security). :raise ValueError: When an interval larger than plaintext space or an empty interval is specified or when an invalid security level is given. :return: A random fixed point number (with statistical security) to be used for masking. """ if security_level is not None and security_level < 0: raise ValueError( f"Security level has value {security_level} this is not supported. It should be either a" f" non-negative integer or `None`" ) # check precision of bounds if fxp(lower_bound).precision > self.precision: warnings.warn( EncryptionSchemeWarning( "The lower bound has more decimals than the precision of the scheme." ) ) if fxp(upper_bound).precision > self.precision: warnings.warn( EncryptionSchemeWarning( "The upper bound has more decimals than the precision of the scheme." ) ) # handle security_level None or 0. if security_level is None: return self.random_plaintext() if security_level == 0: warnings.warn( EncryptionSchemeWarning( "The used security level provides NO security. The returned mask will always be 0." ) ) return fxp(0, target_precision=self.precision) # scale upper bounds to the right precision lower_bound_scaled = fxp(lower_bound, target_precision=self.precision) upper_bound_scaled = fxp(upper_bound, target_precision=self.precision) # we create an interval of size interval_size * 2**(security_level) interval_size = upper_bound_scaled - lower_bound_scaled secure_upper_bound = interval_size << (security_level - 1) return self.random_plaintext(-secure_upper_bound, secure_upper_bound)
[docs] @staticmethod def func_l(input_x: int, n: int) -> int: r""" Paillier specific $L(\cdot)$ function: $L(x) = (x-1)/n$. :param input_x: input $x$ :param n: input $n$ (public key modulus) :return: value of $L(x) = (x-1)/n$. """ return (input_x - 1) // n
[docs] @classmethod def id_from_arguments( cls, public_key: PaillierPublicKey, precision: int = 0, ) -> int: """ Method that turns the arguments for the constructor into an identifier. This identifier is used to find constructor calls that would result in identical schemes. :param public_key: PaillierPublicKey of the Paillier instance. :param precision: Precision of the Paillier instance :return: Identifier of the Paillier instance """ # We use hashlib to ensure consistent hashes over different system architectures. h = hashlib.sha256() pk_id = public_key.id() h.update(pk_id.to_bytes(256 // 8, "big")) h.update(_to_bytes(precision)) return int.from_bytes(h.digest(), "big")
# region Serialization logic
[docs] class SerializedPaillier(TypedDict): prec: int pubkey: PaillierPublicKey seckey: NotRequired[PaillierSecretKey]
[docs] def serialize(self, _opts: SerializerOpts) -> Paillier.SerializedPaillier: r""" Serialization function for Paillier schemes, which will be passed to the communication module. The sharing of the secret key depends on the attribute share_secret_key. :raise SerializationError: When communication library is not installed. :return: Serialized version of this Paillier scheme. """ if not COMMUNICATION_INSTALLED: raise SerializationError() if self.share_secret_key: return self.serialize_with_secret_key() return self.serialize_without_secret_key()
[docs] def serialize_with_secret_key( self, ) -> Paillier.SerializedPaillier: """ Serialization function for Paillier schemes, that does include the secret key. :raise SerializationError: When communication library is not installed. :return: Serialized version of this Paillier scheme. """ if not COMMUNICATION_INSTALLED: raise SerializationError() return { "prec": self.precision, "pubkey": self.public_key, "seckey": self.secret_key, }
[docs] def serialize_without_secret_key(self) -> Paillier.SerializedPaillier: """ Serialization function for Paillier schemes, that does not include the secret key. :raise SerializationError: When communication library is not installed. :return: Serialized version of this Paillier scheme (without the secret key). """ if not COMMUNICATION_INSTALLED: raise SerializationError() return { "prec": self.precision, "pubkey": self.public_key, }
[docs] @staticmethod def deserialize( obj: Paillier.SerializedPaillier, _opts: DeserializerOpts, ) -> Paillier: r""" Deserialization function for Paillier schemes, which will be passed to the communication module. :param obj: Serialized version of a Paillier scheme. :raise SerializationError: When communication library is not installed. :return: Deserialized Paillier scheme from the given dict. Might not have a secret key when that was not included in the received serialization. """ if not COMMUNICATION_INSTALLED: raise SerializationError() pubkey = obj["pubkey"] precision = obj["prec"] # This piece of code is specifically used for the case where sending and receiving # happens between hosts running the same python instance (local network). # In this case, the Paillier scheme that was sent is already available before it # arrives and does not need to be created anymore. identifier = Paillier.id_from_arguments(public_key=pubkey, precision=precision) if identifier in Paillier._instances: paillier = Paillier.from_id(identifier) else: paillier = Paillier( public_key=pubkey, secret_key=obj["seckey"] if "seckey" in obj else None, precision=precision, ) return paillier
# endregion def _to_bytes(n: typing.SupportsInt) -> bytes: """ Unidirectional conversion from numbers.Integral to bytes. :param n: Integer to convert. :return: Byte representation of provided input. """ from tno.mpc.encryption_schemes.utils import USE_GMPY2 if USE_GMPY2: import gmpy2 if isinstance(n, gmpy2.mpz): return gmpy2.to_binary(n) n_int = int(n) return n_int.to_bytes(n_int.bit_length() // 8 + 1, "little") if COMMUNICATION_INSTALLED: try: Serializer.register_class(Paillier) Serializer.register_class(PaillierCiphertext) Serializer.register_class(PaillierPublicKey) Serializer.register_class(PaillierSecretKey) except RepetitionError: pass