Source code for mulearn.optimization

"""Implementation of optimization procedures.

This module module contains the implementations of the optimization processes
behind fuzzy inference.

Once loaded, the module preliminarily verifies that some libraries are
installed (notably, Gurobi and TensorFlow), emitting a warning otherwise.
Note that at least one of these libraries is needed in order to solve the
optimization problems involved in the fuzzy inference process.

"""

import numpy as np
import itertools as it
from collections.abc import Iterable
import logging

import copy

import mulearn.kernel as kernel

logger = logging.getLogger(__name__)

try:
    from gurobipy import LinExpr, GRB, Model, Env, QuadExpr, GurobiError

    gurobi_ok = True
except ModuleNotFoundError:
    # logger.warning('gurobi not available')
    gurobi_ok = False

try:
    import os

    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
    import tensorflow as tf

    tensorflow_ok = True
    from tensorflow.keras.optimizers import Adam

    logging.getLogger('tensorflow').setLevel(logging.ERROR)
except ModuleNotFoundError:
    # logger.warning('tensorflow not available')
    tensorflow_ok = False

try:
    import tqdm

    tqdm_ok = True
except ModuleNotFoundError:
    logger.warning('tqdm not available')
    tqdm_ok = False



[docs]class Solver: """Abstract solver for optimization problems. The base class for solvers is :class:`Solver`: it exposes a method `solve` which delegates the numerical optimization process to an abstract method `solve_problem` and subsequently clips the results to the boundaries of the feasible region. """ def solve_problem(self, *args): pass
[docs] def solve(self, xs, mus, c, k): """Solve optimization phase. Build and solve the constrained optimization problem on the basis of the fuzzy learning procedure. :param xs: Objects in training set. :type xs: iterable :param mus: Membership values for the objects in `xs`. :type mus: iterable :param c: constant managing the trade-off in joint radius/error optimization. :type c: float :param k: Kernel function to be used. :type k: :class:`mulearn.kernel.Kernel` :raises: ValueError if c is non-positive or if xs and mus have different lengths. :returns: `list` -- optimal values for the independent variables of the problem.""" if c <= 0: raise ValueError('c should be positive') mus = np.array(mus) chis = self.solve_problem(xs, mus, c, k) chis_opt = [np.clip(ch, l, u) for ch, l, u in zip(chis, -c * (1 - mus), c * mus)] # noqa return chis_opt
def __eq__(self, other): """Check solver equality w.r.t. other objects.""" return type(self) is type(other) and self.__dict__ == other.__dict__
[docs]class GurobiSolver(Solver): """Solver based on gurobi. Using this class requires that gurobi is installed and activated with a software key. The library is available at no cost for academic purposes (see https://www.gurobi.com/downloads/end-user-license-agreement-academic/). Alongside the library, also its interface to python should be installed, via the gurobipy package. """ default_values = {"time_limit": 10 * 60, "adjustment": 0, "initial_values": None}
[docs] def __init__(self, time_limit=default_values['time_limit'], adjustment=default_values['adjustment'], initial_values=default_values['initial_values']): """ Build an object of type GurobiSolver. :param time_limit: Maximum time (in seconds) before stopping iterative optimization, defaults to 10*60. :type time_limit: int :param adjustment: Adjustment value to be used with non-PSD matrices, defaults to 0. Specifying `'auto'` instead than a numeric value will automatically trigger the optimal adjustment if needed. :type adjustment: float or `'auto'` :param initial_values: Initial values for variables of the optimization problem, defaults to None. :type initial_values: iterable of floats or None """ self.time_limit = time_limit self.adjustment = adjustment self.initial_values = initial_values
[docs] def solve_problem(self, xs, mus, c, k): """Optimize via gurobi. Build and solve the constrained optimization problem at the basis of the fuzzy learning procedure using the gurobi API. :param xs: objects in training set. :type xs: iterable :param mus: membership values for the objects in `xs`. :type mus: iterable :param c: constant managing the trade-off in joint radius/error optimization. :type c: float :param k: kernel computations to be used. :type k: iterable :raises: ValueError if optimization fails or if gurobi is not installed :returns: list -- optimal values for the independent variables of the problem. """ if not gurobi_ok: raise ValueError('gurobi not available') m = len(xs) with Env(empty=True) as env: env.setParam('OutputFlag', 0) env.start() with Model('mulearn', env=env) as model: model.setParam('OutputFlag', 0) model.setParam('TimeLimit', self.time_limit) #model.setParam('NonConvex', 2) if c < np.inf: model.addVars(m, name=[f'chi_{i}' for i in range(m)], lb=-c * (1 - mus), ub=c * mus, vtype=GRB.CONTINUOUS) else: model.addVars(name=[f'chi_{i}' for i in range(m)], vtype=GRB.CONTINUOUS) model.update() chis = np.array(model.getVars()) if self.initial_values is not None: for c, i in zip(chis, self.initial_values): c.start = i obj = QuadExpr() obj.add(chis.dot(k.dot(chis))) obj.add(chis.dot(-1*np.diag(k))) if self.adjustment and self.adjustment != 'auto': obj.add(self.adjustment * chis.dot(chis)) model.setObjective(obj, GRB.MINIMIZE) constEqual = LinExpr() constEqual.add(sum(chis), 1.0) model.addConstr(constEqual == 1) try: model.optimize() except GurobiError as e: print(e.message) if self.adjustment == 'auto': s = e.message a = float(s[s.find(' of ') + 4:s.find(' would')]) logger.warning('non-diagonal Gram matrix, ' f'retrying with adjustment {a}') obj.add(a * chis.dot(chis)) model.setObjective(obj, GRB.MINIMIZE) model.optimize() else: raise e if model.Status != GRB.OPTIMAL: if model.Status == GRB.ITERATION_LIMIT: logger.warning('gurobi: optimization terminated because the total number of simplex \ iterations performed exceeded the value specified in the IterationLimitparameter, \ or because the total number of barrier iterations exceeded the value specified in the BarIterLimit parameter.') elif model.Status == GRB.NODE_LIMIT: logger.warning('gurobi: optimization terminated because the total number of \ branch-and-cut nodes explored exceeded the value specified in the NodeLimit parameter.') elif model.Status == GRB.TIME_LIMIT: logger.warning('gurobi: optimization terminated because the time expended \ exceeded the value specified in the TimeLimit parameter.') elif model.Status == GRB.SUBOPTIMAL: logger.warning('gurobi: optimization terminated with a sub-optimal solution!') else: logger.warning(f'gurobi: optimal solution not found! ERROR CODE: {model.Status}') return [ch.x for ch in chis]
def __repr__(self): args = [] for a in self.default_values: if self.__getattribute__(a) != self.default_values[a]: args.append(f"{a}={self.__getattribute__(a)}") return f"{self.__class__.__name__}({', '.join(args)})"
[docs]class TensorFlowSolver(Solver): """Solver based on TensorFlow. Using this class requires that TensorFlow 2.X is installed.""" default_values = {"initial_values": "random", "init_bound": 0.1, "n_iter": 100, "optimizer": Adam(learning_rate=1e-3) if tensorflow_ok else None, # noqa "tracker": tqdm.trange if tqdm_ok else range}
[docs] def __init__(self, initial_values=default_values["initial_values"], init_bound=default_values["init_bound"], n_iter=default_values["n_iter"], optimizer=default_values["optimizer"], tracker=default_values["tracker"]): """Build an object of type TensorFlowSolver. :param initial_values: values to be used for initializing the independent variables, either randomly (if set to `'random'`)` or seting them to a given sequence of initial values (if set to an iterable of floats), defaults to `'random'`. :type initial_values: `str` or iterable of `float` :param init_bound: Absolute value of the extremes of the interval used for random initialization of independent variables, defaults to 0.1. :type init_bound: `float` :param n_iter: Number of iterations of the optimization process, defaults to 100. :type n_iter: `int` :param optimizer: Optimization algorithm to be used, defaults to Adam with learning rate=1e-4 if tensorflow is available, to None otherwise. :type optimizer: :class:`tf.keras.optimizers.Optimizer` :param tracker: Tool to graphically depict the optimization progress, defaults to `tqdm.trange` if tqdm is available, to `range` (i.e., no graphical tool) otherwise. :type tracker: `object` """ self.init_bound = init_bound self.initial_values = initial_values self.n_iter = n_iter self.optimizer = optimizer self.tracker = tracker
[docs] def solve_lagrange_relaxation(self, Q, q, A, b, C, d, max_iter=10, max_gap=10**-2, alpha_0 = 10**-3, window_width = 10, verbose=False): """Solves the lagrangian relaxation for a constrained optimization problem and returns its result. The structure of the primal problem is the following min x.T Q x + q.T x subject to A x = b C x <= d where .T denotes the transposition operator. Optimization takes place in a iterated two-steps procedure: an outer process devoted to modifying the values of the lagrange multipliers, and an inner process working on the primal variables. The arguments are as follows, given n as the number of variables of the primal problem (i.e., the length of x) - Q: n x n matrix containing the quadratic coefficients of the cost function; - q: vector containing the n linear coefficients of the cost function - A: s x n matrix containing the coefficients of the = constraints - b: vector containing the s right members of the = constraints - C: t x n matrix containing the coefficients of the <= constraings - d: vector containing the t right members of the <= coefficients - max_iter: maximum number of iterations of the *outer* optimization procedure - max_gap: maximum gap between primal and dual objectives ensuring premature end of the *outer* optimization procedure - alpha_0: initial value of the learning rate in the *outer* optimization procedure - window_width: width of the moving window on the objective function for the *inner* optimization process - verbose: boolean flag triggering verbose output returns """ #TODO add possibility to specify initial values x = tf.Variable(np.random.random(len(q)), name='x', trainable=True, dtype=tf.float32) Q = tf.constant(np.array(Q), dtype='float32') q = tf.constant(np.array(q), dtype='float32') A = np.array(A) s = len(A) C = np.array(C) b = np.array(b) d = np.array(d) M = np.vstack([A, -A, C]) m = np.hstack([b, -b, d]) lambda_ = tf.constant(np.random.random(len(m)), dtype='float32') M = tf.constant(M, dtype='float32') m = tf.constant(m, dtype='float32') def original_objective(): def obj(): return tf.tensordot(tf.linalg.matvec(Q, x), x, axes=1) + \ tf.tensordot(q, x, axes=1) return obj def lagrangian_objective(lambda_): def obj(): return tf.tensordot(tf.linalg.matvec(Q, x), x, axes=1) + \ tf.tensordot(q, x, axes=1) + \ tf.tensordot(lambda_, m - tf.linalg.matvec(M, x), axes=1) return obj obj_val = [] lagr_val = [] gap_val = [] gap = max_gap + 1 num_bad_iterations = 0 prev_orig = np.inf i = 0 while i < max_iter and (gap<0 or gap > max_gap): lagr_obj = lagrangian_objective(lambda_) orig_obj = original_objective() prev_lagr = 10*3 curr_lagr = 0 vals = [] t = 0 window_width = 30 window = list(np.logspace(1, window_width, window_width)) # this is to ensure a high value for the standard deviation # of the elements to which the window has been initialized while (np.std(window)/abs(np.mean(window)) > 0.001 or t < 100) \ and t < 1000: self.optimizer.minimize(lagr_obj, var_list=x) prev_lagr = curr_lagr curr_lagr = lagr_obj().numpy() vals.append(curr_lagr) t += 1 window = window[1:] window.append(curr_lagr) curr_orig = orig_obj().numpy() if curr_orig < prev_orig: num_bad_iterations += 1 prev_orig = curr_orig obj_val.append(curr_orig) lagr_val.append(curr_lagr) subgradient = (m - tf.linalg.matvec(M, x)).numpy() gap = tf.tensordot(lambda_[:2*s], m[:2*s] - \ tf.linalg.matvec(M[:2*s], x), axes=1).numpy() gap_val.append(gap) if verbose and i%1 == 0: print(f'i={i}, dual={lagr_obj().numpy():.3f}, ' f'prim={orig_obj().numpy():.3f}, ' f'gap={gap:.6f}') alpha = alpha_0 / num_bad_iterations lambda_ = tf.maximum(0, lambda_ + alpha * subgradient) i += 1 return obj_val, lagr_val, x, lambda_, gap_val
def solve_problem(self, xs, mus, c, k): if not tensorflow_ok: raise ValueError('tensorflow not available') m = len(xs) if self.initial_values == 'random': alphas = [tf.Variable(ch, name=f'alpha_{i}', trainable=True, dtype=tf.float32) for i, ch in enumerate(np.random.uniform( \ -self.init_bound, self.init_bound, m))] betas = [tf.Variable(ch, name=f'beta_{i}', trainable=True, dtype=tf.float32) for i, ch in enumerate(np.random.uniform( \ -self.init_bound, self.init_bound, m))] elif isinstance(self.initial_values, Iterable): alphas = [tf.Variable(ch, name=f'alpha_{i}', trainable=True, dtype=tf.float32) for i, ch in enumerate(self.initial_values)] betas = [tf.Variable(ch, name=f'beta_{i}', trainable=True, dtype=tf.float32) for i, ch in enumerate(self.initial_values)] else: raise ValueError("`initial_values` should either be set to " "'random' or to a list of initial values.") x = alphas + betas K11 = np.array([[-mu_i * mu_j for mu_j in mus] for mu_i in mus]) * k K00 = np.array([[-(1-mu_i) * (1 - mu_j) for mu_j in mus] for mu_i in mus]) * k K01 = np.array([[2 * mu_i * (1-mu_j) for mu_j in mus] for mu_i in mus]) * k Z = np.zeros((m, m)) Q = -np.vstack((np.hstack((K11, K01)), np.hstack((Z, K00)))) q = -np.hstack((np.diag(k) * mus, np.diag(k) * (1 - mus))) A = np.array([np.hstack((mus, 1-mus))]) b = np.array([1]) C = np.vstack((np.hstack((np.identity(m), np.zeros((m, m)))), np.hstack((np.zeros((m, m)), -np.identity(m))))) d = np.hstack((np.zeros(m), - c * np.ones(m))) x = np.random.random(2*m) assert(Q.shape == (2*m, 2*m)) assert(len(q) == 2*m) assert(type(x @ (Q @ x) + q @ x) is np.float64) assert(A.shape == (1, 2*m)) assert(len(b) == 1) assert(type(A@x - b) == np.ndarray) assert(len(A@x - b) == 1) assert(C.shape == (2*m, 2*m)) assert(len(d) == 2*m) assert(type(C @ x - d) == np.ndarray) assert(len(C @ x - d) == 2*m) _, _, x, _, _ = self.solve_lagrange_relaxation(Q, q, A, b, C, d, verbose=True) alphas = np.array(x[:m]) betas = np.array(x[m:]) return alphas * np.array(mus) - betas * (1 - np.array(mus)) def __repr__(self): obj_repr = f"TensorFlowSolver(" for a in ("initial_values", "init_bound", "n_iter", "optimizer", "tracker"): if self.__getattribute__(a) != self.default_values[a]: obj_repr += f", {a}={self.default_values[a]}" return obj_repr + ")"