from collections import Counter from abc import abstractmethod from numpy import array, linspace, less, greater, std, argsort from scipy.signal import argrelextrema import sklearn.cluster from sklearn.neighbors.kde import KernelDensity from sklearn.metrics import max_error, mean_absolute_error from keras.layers import Input, Dense from keras.models import Model from keras.regularizers import l1 # TODO (dmt): Handle algorithms without cluster initialization! SCIKIT_CLUSTERING_TABLE = { sklearn.cluster.KMeans: ("n_clusters", "labels_"), sklearn.cluster.birch.Birch: ("n_clusters", "labels_"), sklearn.cluster.SpectralClustering: ("n_clusters", "labels_"), sklearn.cluster.FeatureAgglomeration: ("n_clusters", "labels_"), sklearn.cluster.AgglomerativeClustering: ("n_clusters", "labels_") } class MachineLearningModel: @abstractmethod def train(self, data, *args, **kwargs): pass class FilterMethod(MachineLearningModel): def __init__(self, model): self.__model = model def train(self, data, *args, **kwargs): self.__model = self.__model.fit(data) return self def reduce(self, data): feature_count = data.feature_count indices = set(self.__model.get_support(indices=True)) return {i for i in range(feature_count)}.difference(indices) class EmbeddedMethod(MachineLearningModel): _ONE_HUNDRET_PERCENT = 100 def __init__(self, model): self.__model = model def train(self, data, *args, **kwargs): labels = data.get_column_values("Z") self.__model = self.__model.fit(data, labels) return self def reduce(self): importance = self.__model.feature_importances_ indices = argsort(importance)[::-1] if (self._ONE_HUNDRET_PERCENT*std(importance))//max(importance) >= 10: less_relevant = indices[-1] return {less_relevant} return {} class ConstructionClusteringMLModel(MachineLearningModel): def __init__(self, model): self.__model = model self._cluster = 2 self.abbreviation = self.__model.__class__.__name__[0:3] def get_labels(self): return self.__model.__getattribute__( SCIKIT_CLUSTERING_TABLE[type(self.__model)][1] ) @property def cluster(self): return self._cluster @cluster.setter def cluster(self, value): self.__model.__setattr__( SCIKIT_CLUSTERING_TABLE[type(self.__model)][0], value ) self._cluster = value @property def cluster_sizes(self): labels = self.__model.__getattribute__( SCIKIT_CLUSTERING_TABLE[type(self.__model)][1] ) return Counter(labels) def train(self, data, *args, **kwargs): self.__model.fit(data) return self class ReconstructionConceptualMLModel(MachineLearningModel): def __init__(self, model): self.__model = model self.accuracy = None self.subject = model.__class__.__name__ def train(self, data, *args, **kwargs): # TODO (dmt): Improve signature of this function! labels = args[0] self.__model = self.__model.fit(data, labels) self.accuracy = self.__model.score(data, labels) return self def predict(self, data): return [i for i in self.__model.predict(data)] class ReconstructionProceduralMLModel(MachineLearningModel): def __init__(self, model): self.__model = model self.mean_error = None self.max_error = None self.subject = model.__class__.__name__ def train(self, data, *args, **kwargs): # TODO (dmt): Provide a better way dealing with # zero values as max_abs_label! labels = args[0] self.__model = self.__model.fit(data, labels) relative_max_error = max_error(y_true=labels, y_pred=self.__model.predict(data)) max_abs_label = max((abs(i) for i in labels)) if max_abs_label == 0: raise ValueError() self.max_error = (relative_max_error*100)/max_abs_label relative_mean_error = mean_absolute_error( y_true=labels, y_pred=self.__model.predict(data)) self.mean_error = (relative_mean_error*100)/max_abs_label return self def predict(self, data): return [i for i in self.__model.predict(data)] class KernelDensityEstimator(MachineLearningModel): def __init__(self, kernel="gaussian", bandwidth=3, gridsize=256): self.__model = None self.kernel = kernel self.bandwidth = bandwidth self.gridsize = gridsize def train(self, data, *args, **kwargs): reshaped_data = array(data).reshape(-1, 1) if not self.__model: self.__model = KernelDensity(kernel=self.kernel, bandwidth=self.bandwidth) self.__model.fit(reshaped_data) return self def density(self): grid = linspace(0, self.gridsize) reshaped_grid = grid.reshape(-1, 1) return self.__model.score_samples(reshaped_grid) class Autoencoder(MachineLearningModel): def __init__(self): self.io_shape = None self.target_number = None self.targets = None self.__model = None self.__hidden_outputter = None def train(self, data, *args, **kwargs): inputer = Input(shape=(self.io_shape, )) hidden = Dense(units=self.target_number, activation='relu', activity_regularizer=l1(0.01))(inputer) outputer = Dense(units=self.io_shape, activation='linear')(hidden) self.__model = Model(inputer, outputer) self.__model.compile(optimizer='adadelta', loss='mean_squared_error', metrics=['accuracy']) self.__hidden_outputter = Model(inputer, hidden) self.__model.fit(data, data, epochs=100, batch_size=2, shuffle=False, validation_data=(data, data), verbose=0) self._predict_targets(data) return self def _predict_targets(self, data): predicts = self.__hidden_outputter.predict(data) self.targets = [predicts[:, i] for i in range(self.__model.layers[1].units)] @property def target_error(self): return 1 - self.__model.history.history['accuracy'][-1] def find_relative_extrema(one_dim_data): relative_min_values = argrelextrema(one_dim_data, less) relative_max_values = argrelextrema(one_dim_data, greater) return relative_min_values, relative_max_values