Source code for logistic_regression.client

"""
Logistic regression client
"""

from __future__ import annotations

import logging

import numpy as np
import numpy.typing as npt
import scipy

from tno.mpc.communication import Pool

from tno.fl.protocols.logistic_regression.msg_ids import (
    COEFS,
    INIT_MODEL,
    LOCAL_HESSIAN,
    LOCAL_UPDATE,
    STANDARD_ERROR,
    WEIGHT,
)

logger = logging.getLogger(__name__)

DataType = npt.NDArray[np.float64]
TargetType = npt.NDArray[np.bool_]
ModelType = npt.NDArray[np.float64]
GradientType = npt.NDArray[np.float64]
HessianType = npt.NDArray[np.float64]
UpdateType = tuple[GradientType, HessianType]


[docs] class Client: """ The client class, representing data owning clients in the learning process. """
[docs] def __init__( self, pool: Pool, max_iter: int = 25, server_name: str = "server", ) -> None: """ Initializes the client. :param pool: The communication pool. :param max_iter: The max number of epochs :param server_name: The name of the server """ # Set the pool self.pool = pool # Set max number of epochs self.max_iter = max_iter # Set the server name self.server_name = server_name
async def _compute_local_weight(self, data: DataType, target: TargetType) -> float: """ Compute the local weight of a client. Defaults to the length of the data set. :param data: The data for the client :param target: The target data for the client :return: The weight of the client dataset """ del target return len(data) async def _send_weight_to_server(self, weight: float) -> None: """ Send the weights (number of rows) to the server. :param weight: The weight of the client """ await self.pool.send(self.server_name, weight, msg_id=WEIGHT) async def _get_initial_model(self, data: DataType, target: TargetType) -> ModelType: """ Computes the initial local model. :param data: The data for the client :param target: The target data for the client :return: The initial local model. """ del target # Send local model to server local_model = np.zeros(data.shape[1]) await self.pool.send(self.server_name, local_model, msg_id=INIT_MODEL) # Receive global initial model from server model = np.array( await self.pool.recv(self.server_name, msg_id=INIT_MODEL), dtype=np.float64, ) return model def _compute_gradient( self, data: DataType, target: TargetType, model: ModelType ) -> GradientType: """ Compute the first-order gradient of the coefficients on the data. :param data: The data set :param target: The target data :param model: The coefficients at which to compute the gradient. :return: The gradient vector. """ # Transform labels array to column vector target_vector = target[:, np.newaxis] # Compute predicted probabilities using sigmoid prob = 1 / (1 + np.exp(-np.dot(data, model.reshape(-1, 1)))) # Compute gradient gradient: GradientType = np.dot(data.T, (prob - target_vector)) # Return gradient and hessian return gradient def _compute_hessian(self, data: DataType, model: ModelType) -> HessianType: """ Compute the second-order derivative of the coefficients on the data. :param data: The data set :param model: The coefficients at which to compute the gradient. :return: The hessian matrix. """ # Compute predicted probabilities using sigmoid prob = 1 / (1 + np.exp(-np.dot(data, model.reshape(-1, 1)))) # Compute diagonal matrix of weights hessian: GradientType = np.dot(np.multiply(data, prob * (1 - prob)).T, data) return hessian def _compute_update( self, data: DataType, target: TargetType, model: ModelType ) -> UpdateType: """ Compute the update for the client: the gradient and hessian :param data: The data set :param target: The target data :param model: The coefficients at which to compute the gradient. :return: The gradient vector. """ gradient = self._compute_gradient(data, target, model) hessian = self._compute_hessian(data, model) return gradient, hessian async def _send_update_to_server(self, update: UpdateType) -> None: """ Send the gradient to the server :param update: The gradient to be sent to the server """ await self.pool.send(self.server_name, update, msg_id=LOCAL_UPDATE) async def _receive_model(self) -> ModelType: """ Receive the updated model from server :return: The updated model """ return np.array( await self.pool.recv(self.server_name, msg_id=COEFS), dtype=np.float64, ) async def _run_epoch( self, data: DataType, target: TargetType, model: ModelType ) -> ModelType: """ Perform one epoch. :param data: The data set :param target: The target data :param model: The model at the start of the epoch. :return: The updated model after the epoch. """ # Compute the update and send to server update = self._compute_update(data, target, model) await self._send_update_to_server(update) # Wait for and update coefficients model = await self._receive_model() # Return the model return model
[docs] async def run(self, data: DataType, target: TargetType) -> ModelType: """ Perform the learning process. :param data: The training data for the client :param target: The target data for the client. Must be an array of boolean values. :return: The resulting model. """ # The server needs the weight of each client (usually number of data points) local_weight = await self._compute_local_weight(data, target) await self._send_weight_to_server(local_weight) # Initialize model model = await self._get_initial_model(data, target) # Run the learning process for _epoch in range(self.max_iter): logger.info(f"Performing epoch {_epoch}/{self.max_iter}") model = await self._run_epoch(data, target, model) return model
[docs] async def compute_standard_error( self, data: DataType, target: TargetType, model: ModelType ) -> npt.NDArray[np.float64]: """ Compute the standard error for a model. :param data: The data set :param target: The target data :param model: The parameters for which to compute the standard error. :return: The standard error """ # Compute Hessian hessian = self._compute_hessian(data, model) # Share weight and Hessian with server weight = await self._compute_local_weight(data, target) await self.pool.send(self.server_name, (weight, hessian), msg_id=LOCAL_HESSIAN) # Receive standard error return np.array(await self.pool.recv(self.server_name, msg_id=STANDARD_ERROR))
[docs] async def compute_statistics( self, data: DataType, target: TargetType, model: ModelType ) -> list[dict[str, float]]: """ Compute statistics for each coefficient: standard error, z-value and p-value. :param data: The data set :param target: The target data :param model: The model for which to compute the statistics :return: A list containing a dictionary for each covariate. The dictionary contains three values: 'se' containing the standard error, 'z' containing z-value (Wald statistic) 'p' containing the p-value. """ standard_errors = await self.compute_standard_error(data, target, model) z_values = model / standard_errors p_values = 2 * (1 - scipy.stats.norm.cdf(abs(z_values))) return [ {"se": standard_errors[i], "z": z_values[i], "p": p_values[i]} for i in range(len(standard_errors)) ]