from collections import Counter from abc import abstractmethod import warnings from numpy import array, linspace, less, greater, std, argsort from scipy.signal import argrelextrema import sklearn.cluster from sklearn.neighbors.kde import KernelDensity from sklearn.metrics import max_error, mean_absolute_error from keras.layers import Input, Dense from keras.models import Model from keras.regularizers import l1 # TODO (dmt): Handle algorithms without cluster initialization! SCIKIT_CLUSTERING_TABLE = { sklearn.cluster.KMeans: ("n_clusters", "labels_"), sklearn.cluster.birch.Birch: ("n_clusters", "labels_"), sklearn.cluster.SpectralClustering: ("n_clusters", "labels_"), sklearn.cluster.FeatureAgglomeration: ("n_clusters", "labels_"), sklearn.cluster.AgglomerativeClustering: ("n_clusters", "labels_") } def log_warning(func): def wrapper(*args, **kwargs): warnings.filterwarnings("ignore") return func(*args, **kwargs) return wrapper class MachineLearningModel: @abstractmethod def train(self, data, *args, **kwargs): pass class FilterMethod(MachineLearningModel): def __init__(self, model): self.__model = model @log_warning def train(self, data, *args, **kwargs): self.__model = self.__model.fit(data) return self def reduce(self, data): indices = set(self.__model.get_support(indices=True)) return {i for i in range(data.learn_cols)}.difference(indices) class EmbeddedMethod(MachineLearningModel): _ONE_HUNDRET_PERCENT = 100 def __init__(self, model): self.__model = model @log_warning def train(self, data, *args, **kwargs): labels = data.get_column_values("Z") self.__model = self.__model.fit(data, labels) return self def reduce(self): importance = self.__model.feature_importances_ indices = argsort(importance)[::-1] if (self._ONE_HUNDRET_PERCENT*std(importance))//max(importance) >= 10: less_relevant = indices[-1] return {less_relevant} return {} class ConstructionClusteringMLModel(MachineLearningModel): def __init__(self, model): self.__model = model self._cluster = 2 self.subject = self.__model.__class__.__name__ self.abbreviation = self.subject[0:3] def get_labels(self): return self.__model.__getattribute__( SCIKIT_CLUSTERING_TABLE[type(self.__model)][1] ) @property def cluster(self): return self._cluster @cluster.setter def cluster(self, value): self.__model.__setattr__( SCIKIT_CLUSTERING_TABLE[type(self.__model)][0], value ) self._cluster = value @property def cluster_sizes(self): labels = self.__model.__getattribute__( SCIKIT_CLUSTERING_TABLE[type(self.__model)][1] ) return Counter(labels) @log_warning def train(self, data, *args, **kwargs): self.__model.fit(data) return self class ReconstructionConceptualMLModel(MachineLearningModel): def __init__(self, model): self.__model = model self.accuracy = None self.subject = None @log_warning def train(self, data, *args, **kwargs): # TODO (dmt): Improve signature of this function! # TODO (dmt): Check if fit can handle data. labels = args[0] self.__model = self.__model.fit(data, labels) self.accuracy = self.__model.score(data, labels) return self def predict(self, data): return [i for i in self.__model.predict(data)] class ReconstructionProceduralMLModel(MachineLearningModel): def __init__(self, model): self.__model = model self.mean_error = None self.max_error = None self.subject = None @log_warning def train(self, data, *args, **kwargs): # TODO (dmt): Provide a better way dealing with # zero values as max_abs_label! labels = args[0] self.__model = self.__model.fit(data, labels) relative_max_error = max_error(y_true=labels, y_pred=self.__model.predict(data)) max_abs_label = max((abs(i) for i in labels)) if max_abs_label == 0: raise ValueError() self.max_error = (relative_max_error*100)/max_abs_label relative_mean_error = mean_absolute_error( y_true=labels, y_pred=self.__model.predict(data)) self.mean_error = (relative_mean_error*100)/max_abs_label return self def predict(self, data): return [i for i in self.__model.predict(data)] class KernelDensityEstimator(MachineLearningModel): def __init__(self, kernel="gaussian", bandwidth=3, gridsize=256): self.__model = None self.kernel = kernel self.bandwidth = bandwidth self.gridsize = gridsize @log_warning def train(self, data, *args, **kwargs): reshaped_data = array(data).reshape(-1, 1) if not self.__model: self.__model = KernelDensity(kernel=self.kernel, bandwidth=self.bandwidth) self.__model.fit(reshaped_data) return self def density(self): grid = linspace(0, self.gridsize) reshaped_grid = grid.reshape(-1, 1) return self.__model.score_samples(reshaped_grid) class Autoencoder(MachineLearningModel): def __init__(self): self.io_shape = None self.target_number = None self.targets = None self.__model = None self.__hidden_outputter = None @log_warning def train(self, data, *args, **kwargs): inputer = Input(shape=(self.io_shape, )) hidden = Dense(units=self.target_number, activation='relu', activity_regularizer=l1(0.01))(inputer) outputer = Dense(units=self.io_shape, activation='linear')(hidden) self.__model = Model(inputer, outputer) self.__model.compile(optimizer='adadelta', loss='mean_squared_error', metrics=['accuracy']) self.__hidden_outputter = Model(inputer, hidden) self.__model.fit(data, data, epochs=100, batch_size=2, shuffle=False, validation_data=(data, data), verbose=0) self._predict_targets(data) return self def _predict_targets(self, data): predicts = self.__hidden_outputter.predict(data) self.targets = [predicts[:, i] for i in range(self.__model.layers[1].units)] @property def target_error(self): return 1 - self.__model.history.history['accuracy'][-1] def find_relative_extrema(one_dim_data): relative_min_values = argrelextrema(one_dim_data, less) relative_max_values = argrelextrema(one_dim_data, greater) return relative_min_values, relative_max_values