Source code for mulearn

import numpy as np
import copy

from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils import check_random_state
from sklearn.exceptions import NotFittedError

from mulearn.kernel import GaussianKernel
from mulearn.optimization import GurobiSolver
from mulearn.fuzzifier import ExponentialFuzzifier

import logging

import warnings
from scipy.optimize import OptimizeWarning
from sklearn.exceptions import FitFailedWarning

logger = logging.getLogger(__name__)

warnings.filterwarnings("ignore", category=OptimizeWarning)
warnings.filterwarnings("ignore", category=FitFailedWarning)

[docs]class FuzzyInductor(BaseEstimator, RegressorMixin): """FuzzyInductor class."""
[docs] def __init__(self, c=1, k=GaussianKernel(), fuzzifier=ExponentialFuzzifier(), # noqa solver=GurobiSolver(), random_state=None): r"""Create an instance of :class:`FuzzyInductor`. :param c: Trade-off constant, defaults to 1. :type c: `float` :param k: Kernel function, defaults to :class:`GaussianKernel()`. :type k: :class:`mulearn.kernel.Kernel` :param fuzzifier: fuzzifier mapping distance values to membership degrees, defaults to `ExponentialFuzzifier()`. :type fuzzifier: :class:`mulearn.fuzzifier.Fuzzifier` :param solver: Solver to be used to obtain the optimization problem solution, defaults to `GurobiSolver()`. :type solver: :class:`mulearn.optimization.Solver` :param random_state: Seed of the pseudorandom generator. :type random_state: `int` """ self.c = c self.k = k self.fuzzifier = fuzzifier self.solver = solver self.random_state = random_state self.estimated_membership_ = None self.x_to_sq_dist_ = None self.chis_ = None self.gram_ = None self.fixed_term_ = None self.train_error_ = None
def __repr__(self, **kwargs): return f"FuzzyInductor(c={self.c}, k={self.k}, f={self.fuzzifier}, " \ f"solver={self.solver})" def _fix_object_state(self, X, y): """Ensure object consistency.""" self.X = X self.y = y def x_to_sq_dist(x_new): ret = self.k.compute(x_new, x_new) \ - 2 * np.array([self.k.compute(x_i, x_new) for x_i in X]).dot(self.chis_) \ + self.fixed_term_ return ret self.fuzzifier.x_to_sq_dist = x_to_sq_dist chi_SV_index = [i for i, (chi, mu) in enumerate(zip(self.chis_, y)) if -self.c * (1 - mu) < chi < self.c * mu] chi_sq_radius = map(x_to_sq_dist, X[chi_SV_index]) chi_sq_radius = list(chi_sq_radius) if len(chi_sq_radius) == 0: self.estimated_membership_ = None self.train_error_ = np.inf self.chis_ = None logger.warning('No support vectors found') return self self.fuzzifier.sq_radius_05 = np.mean(chi_sq_radius), y) return self.fuzzifier.get_membership()
[docs] def fit(self, X, y, warm_start=False): r"""Induce the membership function starting from a labeled sample. :param X: Vectors in data space. :type X: iterable of `float` vectors having the same length :param y: Membership for the vectors in `X`. :type y: iterable of `float` having the same length of `X` :param warm_start: flag triggering the non reinitialization of independent variables of the optimization problem, defaults to None. :type warm_start: `bool` :raises: ValueError if the values in `y` are not between 0 and 1, if `X` and have different lengths, or if `X` contains elements of different lengths. :returns: self -- the trained model. """ if type(X) is not np.array: X = np.array(X) for e in y: if e < 0 or e > 1: raise ValueError("`y` values should belong to [0, 1]") check_X_y(X, y) self.random_state = check_random_state(self.random_state) if warm_start: check_is_fitted(self, ["chis_"]) if self.chis_ is None: raise NotFittedError("chis variable are set to None") self.solver.initial_values = self.chis_ self.chis_ = self.solver.solve(X, y, self.c, self.k) if type(self.k) is kernel.PrecomputedKernel: idx = X.flatten() self.gram_ = self.k.kernel_computations[idx][:, idx] else: self.gram_ = np.array([[self.k.compute(x1, x2) for x1 in X] for x2 in X]) self.fixed_term_ = np.array(self.chis_).dot( self.estimated_membership_ = self._fix_object_state(X, y) self.train_error_ = np.mean([(self.estimated_membership_(x) - mu) ** 2 for x, mu in zip(X, y)]) return self
[docs] def decision_function(self, X): r"""Compute predictions for the membership function. :param X: Vectors in data space. :type X: iterable of `float` vectors having the same length :returns: array of float -- the predictions for each value in `X`. """ check_is_fitted(self, ['chis_', 'estimated_membership_']) X = check_array(X) return np.array([self.estimated_membership_(x) for x in X]) # noqa
[docs] def predict(self, X, alpha=None): r"""Compute predictions for membership to the set. Predictions are either computed through the membership function (when `alpha` is set to a float in [0, 1]) or obtained via an $\alpha$-cut on the same function (when `alpha` is set to `None`). :param X: Vectors in data space. :type X: iterable of `float` vectors having the same length :param alpha: $\alpha$-cut value, defaults to `None`. :type alpha: float :raises: ValueError if `alpha` is set to a value different from `None` and not included in $[0, 1]$. :returns: array of int -- the predictions for each value in `X`. """ check_is_fitted(self, ['chis_', 'estimated_membership_']) X = check_array(X) mus = np.array([mu for mu in self.decision_function(X)]) if alpha is None: return mus else: if alpha < 0 or alpha > 1: raise ValueError("alpha cut value should belong to [0, 1]" f" (provided {alpha})") return np.array([1 if mu >= alpha else 0 for mu in mus])
[docs] def score(self, X, y, **kwargs): r"""Compute the fuzzifier score. Score is obtained as the opposite of MSE between predicted membership values and labels. :param X: Vectors in data space. :type X: iterable of `float` vectors having the same length :param y: Labels containing the *gold standard* membership values for the vectors in `X`. :type y: iterable of `float` having the same length of `X` :returns: `float` -- opposite of MSE between the predictions for the elements in `X` w.r.t. the labels in `y`. """ check_X_y(X, y) if self.estimated_membership_ is None: return -np.inf else: return -np.mean([(self.estimated_membership_(x) - mu) ** 2 for x, mu in zip(X, y)])
def __getstate__(self): """Return a serializable description of the fuzzifier.""" d = copy.deepcopy(self.__dict__) del d['estimated_membership_'] del d['x_to_sq_dist_'] #print(d) return d def __setstate__(self, d): """Ensure fuzzifier consistency after deserialization.""" self.__dict__ = d try: check_is_fitted(self, ['chis_', 'estimated_membership_']) self._fix_object_state(self.X, self.y) self.__dict__['estimated_membership_'] = self.estimated_membership_ self.__dict__['x_to_sq_dist_'] = self.x_to_sq_dist_ self.fuzzifier.x_to_sq_dist = self.x_to_sq_dist_ except NotFittedError: pass