Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ml_adapter.py 6.98 KiB
from collections import Counter
from abc import abstractmethod
import warnings

from numpy import array, linspace, less, greater, std, argsort
from scipy.signal import argrelextrema
import sklearn.cluster
from sklearn.neighbors.kde import KernelDensity
from sklearn.metrics import max_error, mean_absolute_error
from keras.layers import Input, Dense
from keras.models import Model
from keras.regularizers import l1


# TODO (dmt): Handle algorithms without cluster initialization!
SCIKIT_CLUSTERING_TABLE = {
    sklearn.cluster.KMeans: ("n_clusters", "labels_"),
    sklearn.cluster.birch.Birch: ("n_clusters", "labels_"),
    sklearn.cluster.SpectralClustering: ("n_clusters", "labels_"),
    sklearn.cluster.FeatureAgglomeration: ("n_clusters", "labels_"),
    sklearn.cluster.AgglomerativeClustering: ("n_clusters", "labels_")
}


def log_warning(func):
    def wrapper(*args, **kwargs):
        warnings.filterwarnings("ignore")
        return func(*args, **kwargs)
    return wrapper


class MachineLearningModel:
    @abstractmethod
    def train(self, data, *args, **kwargs):
        pass


class FilterMethod(MachineLearningModel):
    def __init__(self, model):
        self.__model = model

    @log_warning
    def train(self, data, *args, **kwargs):
        self.__model = self.__model.fit(data)
        return self

    def reduce(self, data):
        indices = set(self.__model.get_support(indices=True))
        return {i for i in range(data.learn_cols)}.difference(indices)


class EmbeddedMethod(MachineLearningModel):
    _ONE_HUNDRET_PERCENT = 100

    def __init__(self, model):
        self.__model = model

    @log_warning
    def train(self, data, *args, **kwargs):
        labels = data.get_column_values("Z")
        self.__model = self.__model.fit(data, labels)
        return self

    def reduce(self):
        importance = self.__model.feature_importances_
        indices = argsort(importance)[::-1]
        if (self._ONE_HUNDRET_PERCENT*std(importance))//max(importance) >= 10:
            less_relevant = indices[-1]
            return {less_relevant}
        return {}


class ConstructionClusteringMLModel(MachineLearningModel):
    def __init__(self, model):
        self.__model = model
        self._cluster = 2
        self.subject = self.__model.__class__.__name__
        self.abbreviation = self.subject[0:3]

    def get_labels(self):
        return self.__model.__getattribute__(
            SCIKIT_CLUSTERING_TABLE[type(self.__model)][1]
        )

    @property
    def cluster(self):
        return self._cluster

    @cluster.setter
    def cluster(self, value):
        self.__model.__setattr__(
            SCIKIT_CLUSTERING_TABLE[type(self.__model)][0], value
        )
        self._cluster = value

    @property
    def cluster_sizes(self):
        labels = self.__model.__getattribute__(
            SCIKIT_CLUSTERING_TABLE[type(self.__model)][1]
        )
        return Counter(labels)

    @log_warning
    def train(self, data, *args, **kwargs):
        self.__model.fit(data)
        return self


class ReconstructionConceptualMLModel(MachineLearningModel):

    def __init__(self, model):
        self.__model = model
        self.accuracy = None
        self.subject = None

    @log_warning
    def train(self, data, *args, **kwargs):
        # TODO (dmt): Improve signature of this function!
        # TODO (dmt): Check if fit can handle data.
        labels = args[0]
        self.__model = self.__model.fit(data, labels)
        self.accuracy = self.__model.score(data, labels)
        return self

    def predict(self, data):
        return [i for i in self.__model.predict(data)]


class ReconstructionProceduralMLModel(MachineLearningModel):
    def __init__(self, model):
        self.__model = model
        self.mean_error = None
        self.max_error = None
        self.subject = None

    @log_warning
    def train(self, data, *args, **kwargs):
        # TODO (dmt): Provide a better way dealing with
        # zero values as max_abs_label!
        labels = args[0]
        self.__model = self.__model.fit(data, labels)
        relative_max_error = max_error(y_true=labels,
                                       y_pred=self.__model.predict(data))
        max_abs_label = max((abs(i) for i in labels))
        if max_abs_label == 0:
            raise ValueError()
        self.max_error = (relative_max_error*100)/max_abs_label
        relative_mean_error = mean_absolute_error(
            y_true=labels, y_pred=self.__model.predict(data))
        self.mean_error = (relative_mean_error*100)/max_abs_label
        return self

    def predict(self, data):
        return [i for i in self.__model.predict(data)]


class KernelDensityEstimator(MachineLearningModel):

    def __init__(self, kernel="gaussian", bandwidth=3, gridsize=256):
        self.__model = None
        self.kernel = kernel
        self.bandwidth = bandwidth
        self.gridsize = gridsize

    @log_warning
    def train(self, data, *args, **kwargs):
        reshaped_data = array(data).reshape(-1, 1)
        if not self.__model:
            self.__model = KernelDensity(kernel=self.kernel,
                                         bandwidth=self.bandwidth)
        self.__model.fit(reshaped_data)

        return self

    def density(self):
        grid = linspace(0, self.gridsize)
        reshaped_grid = grid.reshape(-1, 1)
        return self.__model.score_samples(reshaped_grid)


class Autoencoder(MachineLearningModel):

    def __init__(self):
        self.io_shape = None
        self.target_number = None
        self.targets = None
        self.__model = None
        self.__hidden_outputter = None

    @log_warning
    def train(self, data, *args, **kwargs):
        inputer = Input(shape=(self.io_shape, ))
        hidden = Dense(units=self.target_number,
                       activation='relu',
                       activity_regularizer=l1(0.01))(inputer)
        outputer = Dense(units=self.io_shape,
                         activation='linear')(hidden)

        self.__model = Model(inputer, outputer)
        self.__model.compile(optimizer='adadelta',
                             loss='mean_squared_error',
                             metrics=['accuracy'])
        self.__hidden_outputter = Model(inputer, hidden)
        self.__model.fit(data,
                         data,
                         epochs=100,
                         batch_size=2,
                         shuffle=False,
                         validation_data=(data, data),
                         verbose=0)
        self._predict_targets(data)
        return self

    def _predict_targets(self, data):
        predicts = self.__hidden_outputter.predict(data)
        self.targets = [predicts[:, i]
                        for i in range(self.__model.layers[1].units)]

    @property
    def target_error(self):
        return 1 - self.__model.history.history['accuracy'][-1]


def find_relative_extrema(one_dim_data):
    relative_min_values = argrelextrema(one_dim_data, less)
    relative_max_values = argrelextrema(one_dim_data, greater)
    return relative_min_values, relative_max_values