Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ml_adapter.py 6.98 KiB
from collections import Counter
from abc import abstractmethod
import warnings
from numpy import array, linspace, less, greater, std, argsort
from scipy.signal import argrelextrema
import sklearn.cluster
from sklearn.neighbors.kde import KernelDensity
from sklearn.metrics import max_error, mean_absolute_error
from keras.layers import Input, Dense
from keras.models import Model
from keras.regularizers import l1
# TODO (dmt): Handle algorithms without cluster initialization!
SCIKIT_CLUSTERING_TABLE = {
sklearn.cluster.KMeans: ("n_clusters", "labels_"),
sklearn.cluster.birch.Birch: ("n_clusters", "labels_"),
sklearn.cluster.SpectralClustering: ("n_clusters", "labels_"),
sklearn.cluster.FeatureAgglomeration: ("n_clusters", "labels_"),
sklearn.cluster.AgglomerativeClustering: ("n_clusters", "labels_")
}
def log_warning(func):
def wrapper(*args, **kwargs):
warnings.filterwarnings("ignore")
return func(*args, **kwargs)
return wrapper
class MachineLearningModel:
@abstractmethod
def train(self, data, *args, **kwargs):
pass
class FilterMethod(MachineLearningModel):
def __init__(self, model):
self.__model = model
@log_warning
def train(self, data, *args, **kwargs):
self.__model = self.__model.fit(data)
return self
def reduce(self, data):
indices = set(self.__model.get_support(indices=True))
return {i for i in range(data.learn_cols)}.difference(indices)
class EmbeddedMethod(MachineLearningModel):
_ONE_HUNDRET_PERCENT = 100
def __init__(self, model):
self.__model = model
@log_warning
def train(self, data, *args, **kwargs):
labels = data.get_column_values("Z")
self.__model = self.__model.fit(data, labels)
return self
def reduce(self):
importance = self.__model.feature_importances_
indices = argsort(importance)[::-1]
if (self._ONE_HUNDRET_PERCENT*std(importance))//max(importance) >= 10:
less_relevant = indices[-1]
return {less_relevant}
return {}
class ConstructionClusteringMLModel(MachineLearningModel):
def __init__(self, model):
self.__model = model
self._cluster = 2
self.subject = self.__model.__class__.__name__
self.abbreviation = self.subject[0:3]
def get_labels(self):
return self.__model.__getattribute__(
SCIKIT_CLUSTERING_TABLE[type(self.__model)][1]
)
@property
def cluster(self):
return self._cluster
@cluster.setter
def cluster(self, value):
self.__model.__setattr__(
SCIKIT_CLUSTERING_TABLE[type(self.__model)][0], value
)
self._cluster = value
@property
def cluster_sizes(self):
labels = self.__model.__getattribute__(
SCIKIT_CLUSTERING_TABLE[type(self.__model)][1]
)
return Counter(labels)
@log_warning
def train(self, data, *args, **kwargs):
self.__model.fit(data)
return self
class ReconstructionConceptualMLModel(MachineLearningModel):
def __init__(self, model):
self.__model = model
self.accuracy = None
self.subject = None
@log_warning
def train(self, data, *args, **kwargs):
# TODO (dmt): Improve signature of this function!
# TODO (dmt): Check if fit can handle data.
labels = args[0]
self.__model = self.__model.fit(data, labels)
self.accuracy = self.__model.score(data, labels)
return self
def predict(self, data):
return [i for i in self.__model.predict(data)]
class ReconstructionProceduralMLModel(MachineLearningModel):
def __init__(self, model):
self.__model = model
self.mean_error = None
self.max_error = None
self.subject = None
@log_warning
def train(self, data, *args, **kwargs):
# TODO (dmt): Provide a better way dealing with
# zero values as max_abs_label!
labels = args[0]
self.__model = self.__model.fit(data, labels)
relative_max_error = max_error(y_true=labels,
y_pred=self.__model.predict(data))
max_abs_label = max((abs(i) for i in labels))
if max_abs_label == 0:
raise ValueError()
self.max_error = (relative_max_error*100)/max_abs_label
relative_mean_error = mean_absolute_error(
y_true=labels, y_pred=self.__model.predict(data))
self.mean_error = (relative_mean_error*100)/max_abs_label
return self
def predict(self, data):
return [i for i in self.__model.predict(data)]
class KernelDensityEstimator(MachineLearningModel):
def __init__(self, kernel="gaussian", bandwidth=3, gridsize=256):
self.__model = None
self.kernel = kernel
self.bandwidth = bandwidth
self.gridsize = gridsize
@log_warning
def train(self, data, *args, **kwargs):
reshaped_data = array(data).reshape(-1, 1)
if not self.__model:
self.__model = KernelDensity(kernel=self.kernel,
bandwidth=self.bandwidth)
self.__model.fit(reshaped_data)
return self
def density(self):
grid = linspace(0, self.gridsize)
reshaped_grid = grid.reshape(-1, 1)
return self.__model.score_samples(reshaped_grid)
class Autoencoder(MachineLearningModel):
def __init__(self):
self.io_shape = None
self.target_number = None
self.targets = None
self.__model = None
self.__hidden_outputter = None
@log_warning
def train(self, data, *args, **kwargs):
inputer = Input(shape=(self.io_shape, ))
hidden = Dense(units=self.target_number,
activation='relu',
activity_regularizer=l1(0.01))(inputer)
outputer = Dense(units=self.io_shape,
activation='linear')(hidden)
self.__model = Model(inputer, outputer)
self.__model.compile(optimizer='adadelta',
loss='mean_squared_error',
metrics=['accuracy'])
self.__hidden_outputter = Model(inputer, hidden)
self.__model.fit(data,
data,
epochs=100,
batch_size=2,
shuffle=False,
validation_data=(data, data),
verbose=0)
self._predict_targets(data)
return self
def _predict_targets(self, data):
predicts = self.__hidden_outputter.predict(data)
self.targets = [predicts[:, i]
for i in range(self.__model.layers[1].units)]
@property
def target_error(self):
return 1 - self.__model.history.history['accuracy'][-1]
def find_relative_extrema(one_dim_data):
relative_min_values = argrelextrema(one_dim_data, less)
relative_max_values = argrelextrema(one_dim_data, greater)
return relative_min_values, relative_max_values