Source code for featuristic.synthesis.genetic_feature_synthesis

"""Contains the SymbolicFeatureGenerator class."""

import sys
from typing import List, Union
import matplotlib.pyplot as plt
import matplotlib
import pandas as pd
from joblib import cpu_count
from sklearn.base import BaseEstimator, TransformerMixin
from tqdm import tqdm

from .fitness import fitness_pearson
from .mrmr import MaxRelevanceMinRedundancy
from .population import ParallelPopulation, SerialPopulation
from .program import render_prog
from .symbolic_functions import CustomSymbolicFunction, operations
from .preprocess import preprocess_data


[docs] class GeneticFeatureSynthesis(BaseEstimator, TransformerMixin): """ The Genetic Feature Synthesis class uses genetic programming to generate new features using a technique based on Symbolic Regression. This is done by initially building a population of naive random formulas that represent transformations of the input features. The population is then evolved over a number of generations using genetic functions such as mutation and crossover to find the best programs that minimize a given fitness function. The best features are then identified using a Maximum Relevance Minimum Redundancy (mRMR) algorithm to find those features that are most correlated with the target variable while being least correlated with each other. """
[docs] def __init__( self, num_features: int = 10, population_size: int = 100, max_generations: int = 25, tournament_size: int = 10, crossover_proba: float = 0.85, parsimony_coefficient: float = 0.001, early_termination_iters: int = 15, functions: Union[List[str] | None] = None, custom_functions: Union[List[CustomSymbolicFunction] | None] = None, return_all_features: bool = True, n_jobs: int = -1, pbar: bool = True, verbose: bool = False, ): """ Initialize the Symbolic Feature Generator. Args ---- num_features : int The number of best features to generate. Internally, `3 * num_features` programs are generated and the best `num_features` are selected via Maximum Relevance Minimum Redundancy (mRMR). population_size : int The number of programs in each generation. The larger the population, the more likely it is to find a good solution, but the longer it will take. max_generations : int The maximum number of generations to run. The larger the number of generations, the more likely it is to find a good solution, but the longer it will take. tournament_size : int The size of the tournament for selection. The larger the tournament size, the more likely it is to select the best program, but the more computation it will take. crossover_proba : float The probability of crossover mutation between selected parents in each generation. parsimony_coefficient : float The parsimony coefficient. Larger values penalize larger programs more and encourage smaller programs. This helps prevent bloat where the programs become increasingly large and complex without improving the fitness, which increases computation complexity and reduces the interpretability of the features. early_termination_iters : int If the best score does not improve for this number of generations, then the algorithm will terminate early. functions : list The list of functions to use in the programs. If `None` then all the built-in functions are used. The functions must be the names of the functions returned by the `list_symbolic_functions` method. custom_functions : list A list of custom functions to use in the programs. Each custom function must be an instance of the `CustomSymbolicFunction` class. return_all_features : bool Whether to return all the features generated or just the best features. n_jobs : int The number of parallel jobs to run. If `-1`, use all available cores else uses n_jobs. If `n_jobs=1`, then the computation is done in serial. pbar: bool Whether to show a progress bar. verbose : bool Whether to print out aditional information """ if functions is None: self.functions = operations else: self.functions = [] for func in functions: found = False for op in operations: if op().name == func: self.functions.append(op) found = True break if not found: raise ValueError( f"Function '{func}' not found in symbolic operations" ) if custom_functions is not None: for func in custom_functions: self.functions.append(func) self.population_size = population_size self.max_generations = max_generations self.tournament_size = tournament_size self.crossover_proba = crossover_proba self.num_features = num_features self.parsimony_coefficient = parsimony_coefficient self.history = [] self.hall_of_fame = [] self.len_hall_of_fame = self.num_features * 5 self.population = None self.early_termination_iters = early_termination_iters self.early_termination_counter = 0 self.return_all_features = return_all_features self.fitness_func = fitness_pearson self.verbose = verbose self.fit_called = False if n_jobs == -1: self.n_jobs = cpu_count() else: self.n_jobs = n_jobs self.pbar = pbar
def _update_hall_of_fame(self, fitness: List[float]): for individual, fit in zip(self.population.population, fitness): current_fitnesses = [x["fitness"] for x in self.hall_of_fame] if fit not in current_fitnesses: self.hall_of_fame.append({"individual": individual, "fitness": fit}) self.hall_of_fame = sorted(self.hall_of_fame, key=lambda x: x["fitness"]) self.hall_of_fame = self.hall_of_fame[: self.len_hall_of_fame] def _select_best_features(self, X: pd.DataFrame, y: pd.Series): """ Select the best features using the mRMR algorithm. Args ---- X : pd.DataFrame The dataframe with the features. y : pd.Series The target variable. return ------ None """ population = SerialPopulation( len(self.hall_of_fame), self.functions, self.tournament_size, self.crossover_proba, ) population.population = [x["individual"] for x in self.hall_of_fame] features = pd.DataFrame(population.evaluate(X)).T features.columns = [f"feature_{i}" for i in range(self.len_hall_of_fame)] for i in range(self.len_hall_of_fame): self.hall_of_fame[i]["name"] = f"feature_{i}" selected = ( MaxRelevanceMinRedundancy(k=self.num_features, pbar=self.pbar) .fit_transform(features, y) .columns ) selected = [int(x.split("_")[1]) for x in selected] self.hall_of_fame = [self.hall_of_fame[i] for i in selected] def fit(self, X: pd.DataFrame, y: pd.Series) -> "GeneticFeatureSynthesis": """ Fit the symbolic feature generator to the data. Args ---- X : pd.DataFrame The dataframe with the features. y : pd.Series The target variable. return ------ returns self """ X_copy, y_copy = preprocess_data( X.reset_index(drop=True), y.reset_index(drop=True) ) # Initialize the population if self.n_jobs == 1: self.population = SerialPopulation( self.population_size, self.functions, self.tournament_size, self.crossover_proba, ).initialize(X_copy) else: self.population = ParallelPopulation( self.population_size, self.functions, self.tournament_size, self.crossover_proba, self.n_jobs, ).initialize(X_copy) # loss value to minimize global_best = sys.maxsize best_prog = None if self.pbar: pbar = tqdm(total=self.max_generations, desc="Creating new features...") for gen in range(self.max_generations): fitness = [] prediction = self.population.evaluate(X_copy) score = self.population.compute_fitness( self.fitness_func, self.parsimony_coefficient, prediction, y_copy ) # import pdb # pdb.set_trace() # score = self.population.apply_parsimony(score, self.parsimony_coefficient) # prog_len = [node_count(prog) for prog in self.population.population] # clf = np.cov(prog_len, score)[0, 1] # vl = np.var(prog_len) # parsimony = clf / vl # score = [x - (parsimony * y) for x, y in zip(score, prog_len)] for prog, score in zip(self.population.population, score): fitness.append(score) if score < global_best: global_best = score best_prog = prog self.early_termination_counter = 0 # update the history results = { "generation": gen, "best_score": global_best, "median_score": pd.Series(fitness).median(), "best_program": render_prog(best_prog), } self.history.append(results) # check for early termination self.early_termination_counter += 1 if self.early_termination_counter >= self.early_termination_iters: if self.verbose: print( f"Early termination at iter {gen}, best error: {global_best:10.6f}" ) break if self.pbar: pbar.update(1) # update the hall of fame with the best programs from the current generation self._update_hall_of_fame(fitness) self.population.evolve(fitness, X_copy) # select the best features using mrmr self._select_best_features(X_copy, y_copy) if self.verbose: print("Symbolic Feature Generator") print(f"Best program: {render_prog(best_prog)}") print(f"Best score: {global_best}") # we've successfully finished the fit self.fit_called = True return self def transform(self, X: pd.DataFrame, y: pd.Series = None) -> pd.DataFrame: """ Transform the dataframe of features using the best programs found. Args ---- X : pd.DataFrame The dataframe with the features. return ------ pd.DataFrame The transformed dataframe. """ if not self.fit_called: raise ValueError("Must call fit before transform") if self.n_jobs == 1: population = SerialPopulation( len(self.hall_of_fame), self.functions, self.tournament_size, self.crossover_proba, ) else: population = ParallelPopulation( len(self.hall_of_fame), self.functions, self.tournament_size, self.crossover_proba, self.n_jobs, ) population.population = [x["individual"] for x in self.hall_of_fame] output = pd.DataFrame(population.evaluate(X.reset_index(drop=True))).T output.columns = [x["name"] for x in self.hall_of_fame] if self.return_all_features: return pd.concat([X.reset_index(drop=True), output], axis=1) return output def fit_transform(self, X: pd.DataFrame, y: pd.Series = None) -> pd.DataFrame: """ Fit the symbolic feature generator to the data and transform the dataframe of features. Args ---- X : pd.DataFrame The dataframe with the features. y : pd.Series The target variable. return ------ pd.DataFrame The transformed dataframe. """ self.fit(X, y) return self.transform(X, y) def get_feature_info(self) -> pd.DataFrame: """ Get the information about the best programs found. return ------ pd.DataFrame The dataframe with the information. """ if not self.fit_called: raise ValueError("Must call fit before get_feature_info") output = [] for prog in self.hall_of_fame: tmp = { "name": prog["name"], "formula": render_prog(prog["individual"]), "fitness": prog["fitness"], } output.append(tmp) return pd.DataFrame(output) def plot_history(self, ax: Union[matplotlib.axes._axes.Axes | None] = None): """ Plot the history of the fitness function. return ------ None """ if not self.fit_called: raise ValueError("Must call fit before plot_history") if ax is None: _, ax = plt.subplots() df = pd.DataFrame(self.history) df.plot(x="generation", y=["best_score", "median_score"], ax=ax) plt.show()