"""
The generate method generates a set of unique indicator vectors. It internally
chooses between a direct and an indirect method, where the direct method is
efficient for small sets and the indirect method is efficient for large sets.
A demo of the different methods is presented by running the file as a script.
Usage (Python 3.6):
.. code-block:: python
>>> from mpyc.runtime import mpc
>>> import random_unit_vectors
>>>
>>> masked_vectors = random_unit_vectors.generate(5,2)
>>> unmasked_vectors = list(map(lambda x: mpc.run(mpc.output(x)), masked_vectors))
>>> print(unmasked_vectors)
[[0, 1, 0, 0, 0], [0, 0, 0, 1, 0]]
"""
from typing import Any, Awaitable, Coroutine, List, Optional, Sequence, Type
import mpyc.random
from mpyc.runtime import mpc
from tno.mpc.mpyc.stubs.asyncoro import mpc_coro_ignore, returnType
from tno.mpc.mpyc.secure_learning.utils.types import (
Matrix,
SecNumTypesTV,
SeqMatrix,
Vector,
seq_to_list,
)
from tno.mpc.mpyc.secure_learning.utils.util_matrix_vec import matrix_sum
[docs]
async def random_unit_vectors_are_unique(
p: List[Vector[SecNumTypesTV]],
) -> Coroutine[Any, Any, Awaitable[bool]]:
"""
Verify whether all passed unit vectors are unique.
:param p: List of indicator vectors
:return: True if all indicator vectors are unique, False otherwise
"""
q = matrix_sum(p)
return mpc.is_zero_public(mpc.in_prod(q, q) - len(p))
@mpc_coro_ignore
async def _unique_random_unit_vectors(
sectype: Type[SecNumTypesTV],
n: int,
b: int,
maxcounter: int = 30,
counter: int = 1,
) -> List[Vector[SecNumTypesTV]]:
"""
Generate set of random unit vectors.
:param sectype: Secure type of unit vector elements
:param n: Length of unit vector
:param b: Number of unit vectors
:param maxcounter: Maximum number of tries to generate unique vectors
before calling indirect method
:param counter: The counter
:return: Set of random unit vectors
"""
await returnType(sectype, b, n)
p = [mpyc.random.random_unit_vector(sectype, n) for _ in range(b)]
if await random_unit_vectors_are_unique(p):
return p
if counter < maxcounter:
vectors: List[Vector[SecNumTypesTV]] = await _unique_random_unit_vectors(
sectype=sectype, n=n, b=b, maxcounter=maxcounter, counter=counter + 1
)
return vectors
print(
"random_unit_vectors.generate_unique_unit_vectors_direct: \
max number of recursions reached, now calling \
generate_unique_unit_vectors_indirect"
)
return generate_unique_unit_vectors_indirect(sectype=sectype, n=n, b=b)
[docs]
def generate_unique_unit_vectors_direct(
sectype: Type[SecNumTypesTV], n: int, b: int = 1
) -> List[Vector[SecNumTypesTV]]:
"""
Direct approach for generating random indicator vectors. In this approach,
we generate sets of random indicator vectors until all vectors are unique.
Remark: if this method is used often and b is fixed, then the number of
retries leaks statistical information about the total number of samples.
Use the indirect method if this is an issue.
:param sectype: Secure type of unit vector elements
:param n: Length of unit vector
:param b: Number of unit vectors
:return: Set of random unit vectors
"""
vectors: List[Vector[SecNumTypesTV]] = _unique_random_unit_vectors(sectype, n, b)
return vectors
# Indirect method
def _random_binary_permutation(
sectype: Type[SecNumTypesTV], n: int, b: int
) -> Vector[SecNumTypesTV]:
"""
Returns vector of random ones and zeros.
:param sectype: Secure type of unit vector elements
:param n: Length of vector
:param b: Number of ones in vector
:return: Vector with b randomly placed ones and n-b zeros
"""
p = [sectype(1)] * b + [sectype(0)] * (n - b)
permutation: Vector[SecNumTypesTV] = mpyc.random.shuffle(sectype, p) # type: ignore[no-untyped-call]
return permutation
[docs]
def decompose_random_binary_permutation(
s: Vector[SecNumTypesTV], b: int
) -> List[Vector[SecNumTypesTV]]:
"""
Extract a set of indicator vectors from a vector with ones and zeroes.
Every indicator vector denotes the position of a one in the input vector.
Assume that the input vector contains exactly 1 one. We extract an
indicator vector from this vector 's' as follows:
.. code-block:: python
>>> eps = 0
>>> delta = [0]*n
>>> for i in range(n):
>>>\tdelta[i] = (1-eps)*s[i]
>>>\t.delta = eps + delta[i]
The first line in the for-loop copies the i-th element of `s` as long as
`eps` equals zero. This remains true until the loop reaches the index `i*`
such that `s[i] = 1`. Now, `delta[i*]` is set to one and subsequently
`eps` is set to one. As a consequence, all elements of `delta` with index
at least `i*` are set to zero. We have thus obtained a indicator vector
`delta` that has 1-bit on the index that corresponds to the first 1-bit of
`s`.
The above approach can be generalized as to obtain `b` indicator vectors
from a permutation of `b` 1-bits. Note that all these indicator vectors
are unique and, by construction, the indicator vectors are sorted
according to the index of their 1-bit.
:param s: Vector with zeros and ones
:param b: Number of ones in s
:return: Unique set of random indicator vectors
"""
stype = type(s[0])
n = len(s)
eps = [stype(1)] + [stype(0)] * b
d = [[stype(0)] * n for __ in range(b)]
for i in range(n):
for j in range(b):
d[j][i] = eps[j] * (1 - eps[j + 1]) * s[i]
for j in range(b):
eps[j + 1] = eps[j + 1] + d[j][i]
return d
[docs]
def generate_unique_unit_vectors_indirect(
sectype: Type[SecNumTypesTV], n: int, b: int = 1
) -> List[Vector[SecNumTypesTV]]:
"""
Generate a random binary permutation and then extract unique random
indicator vectors from the given permutation.
:param sectype: Secure type of unit vector elements
:param n: Length of unit vector
:param b: Number of unit vectors
:return: Unique set of random indicator vectors
"""
s = _random_binary_permutation(sectype, n, b)
return decompose_random_binary_permutation(s, b)
[docs]
def generate_unique_unit_vectors(
sectype: Type[SecNumTypesTV], n: int, b: int = 1
) -> List[Vector[SecNumTypesTV]]:
"""
Generate a set of unique indicator vectors.
It internally chooses between a direct and an indirect method, where the
direct method is efficient for small sets and the indirect method is
efficient for large sets.
:param sectype: Secure type of unit vector elements
:param n: Length of unit vector
:param b: Number of unit vectors
:raise ValueError: Occurs when 0 <= b <= n when generating indicator
vectors
:return: Unique set of random indicator vectors
"""
if b < 0 or b > n:
raise ValueError(
"Ensure that 0 <= b <= n when generating indicator \
vectors."
)
# elif b < 2.1*math.sqrt(n):
# return generate_unique_unit_vectors_direct(n, b) # If b < c*math.sqrt(n), then (asymptotically) the expected number of recursions
# # is at most exp(c).
# # c = 2 has been determined experimentally (test2) in case of a single party
# else:
# return generate_unique_unit_vectors_indirect(n, b)
# In the multi-party setting, generate_unique_unit_vectors_indirect is several orders of magnitute slower
# (requires too much communication).
return generate_unique_unit_vectors_direct(sectype, n, b)
[docs]
def random_matrix_permutation(
sectype: Type[SecNumTypesTV], row_length: int, rows: Optional[int] = None
) -> List[Vector[SecNumTypesTV]]:
"""
Return a random permutation matrix.
:param sectype: Secure type of unit vector elements
:param row_length: (Row) length of desired matrix
:param rows: Number of rows to return
:return: Random permutation matrix
"""
if rows is None:
rows = row_length
matrix = [[sectype(0)] * row_length for __ in range(row_length)]
for _ in range(row_length):
matrix[_][_] = sectype(1)
return permute_matrix(matrix)[:rows]
[docs]
def permute_matrix(matrix: SeqMatrix[SecNumTypesTV]) -> Matrix[SecNumTypesTV]:
"""
Permute matrix randomly.
:param matrix: Matrix to be permuted
:raise TypeError: Input is not a matrix
:return: Permutated matrix
"""
if not isinstance(matrix[0], Sequence):
raise TypeError("Input is not a matrix.")
matrix = seq_to_list(matrix)
stype = type(matrix[0][0])
rows = len(matrix)
# Knuth shuffling on vectors
for row in range(rows - 1): # Knuth shuffling
y_r = mpyc.random.random_unit_vector(stype, rows - row)
X_r = mpc.matrix_prod([y_r], matrix[row:])[0]
# X_r = matr_sum((mpc_utils.mult_scalar_mul(y_r, X[row:], tr=True))
d_r = mpc.matrix_prod([[v] for v in y_r], [mpc.vector_sub(matrix[row], X_r)])
matrix[row] = X_r
for _ in range(rows - row):
matrix[row + _] = mpc.vector_add(matrix[row + _], d_r[_])
return matrix